Module connpy.grpc_layer.stubs

Functions

def handle_errors(func)
Expand source code
def handle_errors(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except grpc.RpcError as e:
            # Re-raise gRPC errors as native ConnpyError to keep CLI handlers agnostic
            details = e.details()

            # Identify the host if available on the instance
            instance = args[0] if args else None
            host = getattr(instance, "remote_host", "remote host")

            # Make common gRPC errors more readable
            if "failed to connect to all addresses" in details:
                simplified = f"Failed to connect to remote host at {host} (Connection refused)"
            elif "Method not found" in details:
                simplified = f"Remote server at {host} is using an incompatible version"
            elif "Deadline Exceeded" in details:
                simplified = f"Request to {host} timed out"
            else:
                simplified = details

            raise ConnpyError(simplified)
    return wrapper

Classes

class AIStub (channel, remote_host)
Expand source code
class AIStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.AIServiceStub(channel)
        self.remote_host = remote_host

    def _ai_chat_stream(self, stub_method, input_text, dryrun=False, chat_history=None, session_id=None, debug=False, status=None, chunk_callback=None, **overrides):
        import queue
        from rich.prompt import Prompt
        from rich.text import Text
        from rich.panel import Panel
        from rich.markdown import Markdown
        
        req_queue = queue.Queue()
        
        initial_req = connpy_pb2.AskRequest(
            input_text=input_text,
            dryrun=dryrun,
            session_id=session_id or "",
            debug=debug,
            engineer_model=overrides.get("engineer_model", ""),
            engineer_api_key=overrides.get("engineer_api_key", ""),
            architect_model=overrides.get("architect_model", ""),
            architect_api_key=overrides.get("architect_api_key", ""),
            trust=overrides.get("trust", False)
        )
        if chat_history is not None:
            initial_req.chat_history.CopyFrom(to_value(chat_history))
        if "engineer_auth" in overrides and overrides["engineer_auth"]:
            initial_req.engineer_auth.CopyFrom(to_struct(overrides["engineer_auth"]))
        if "architect_auth" in overrides and overrides["architect_auth"]:
            initial_req.architect_auth.CopyFrom(to_struct(overrides["architect_auth"]))
            
        req_queue.put(initial_req)

        def request_generator():
            while True:
                req = req_queue.get()
                if req is None: break
                yield req

        responses = stub_method(request_generator())
        
        full_content = ""
        header_printed = False
        current_responder = "engineer"
        final_result = {"response": "", "chat_history": []}

        # Background thread to pull responses from gRPC into a local queue
        # This prevents KeyboardInterrupt from corrupting the gRPC iterator state
        response_queue = queue.Queue()
        
        def pull_responses():
            try:
                for response in responses:
                    response_queue.put(("data", response))
            except Exception as e:
                response_queue.put(("error", e))
            finally:
                response_queue.put((None, None))

        threading.Thread(target=pull_responses, daemon=True).start()

        try:
            while True:
                try:
                    # BLOCKING GET from local queue (interruptible by signal)
                    msg_type, response = response_queue.get()
                except KeyboardInterrupt:
                    # Signal interruption to the server
                    if status:
                        status.update("[error]Interrupted! Closing pending tasks...")
                    
                    # Send the interrupt signal to the server
                    req_queue.put(connpy_pb2.AskRequest(interrupt=True))
                    
                    # CONTINUE the loop to receive remaining data and summary from the queue
                    continue
                
                if msg_type is None: # Sentinel
                    break
                
                if msg_type == "error":
                    # Re-raise or handle gRPC error from background thread
                    if isinstance(response, grpc.RpcError):
                        raise response
                    printer.warning(f"Stream interrupted: {response}")
                    break

                if response.status_update:
                    if response.status_update.startswith("__RESPONDER__:"):
                        current_responder = response.status_update.split(":")[1].lower()
                        continue
                        
                    if response.requires_confirmation:
                        if status: status.stop()
                        
                        # Show prompt and wait for answer
                        prompt_text = Text.from_ansi(response.status_update)
                        ans = Prompt.ask(prompt_text)
                        
                        if status: 
                            status.update("[ai_status]Agent: Resuming...")
                            status.start()
                        
                        req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans))
                        continue
                        
                    if status:
                        status.update(response.status_update)
                    continue
                
                if response.debug_message:
                    if debug:
                        if status:
                            try: status.stop()
                            except: pass
                        printer.console.print(Text.from_ansi(response.debug_message))
                        if status:
                            try: status.start()
                            except: pass
                    continue
                
                if response.important_message:
                    if status:
                        try: status.stop()
                        except: pass
                    printer.console.print(Text.from_ansi(response.important_message))
                    if status:
                        try: status.start()
                        except: pass
                    continue

                if not response.is_final:
                    if response.text_chunk:
                        if not header_printed:
                            if status:
                                try: status.stop()
                                except: pass
                            
                            if chunk_callback:
                                header_printed = True
                            else:
                                from rich.console import Console as RichConsole
                                from rich.rule import Rule
                                from ..printer import connpy_theme, get_original_stdout, IncrementalMarkdownParser
                                stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout())
                                
                                # Print header on first chunk
                                alias = "architect" if current_responder == "architect" else "engineer"
                                role_label = "Network Architect" if current_responder == "architect" else "Network Engineer"
                                stable_console.print(Rule(f"[bold {alias}]{role_label}[/bold {alias}]", style=alias))
                                header_printed = True
                                
                                # Initialize parser
                                md_parser = IncrementalMarkdownParser(console=stable_console)
                        
                        full_content += response.text_chunk
                        if chunk_callback:
                            chunk_callback(response.text_chunk)
                        elif md_parser:
                            md_parser.feed(response.text_chunk)
                    continue
                
                if response.is_final:
                    if not chunk_callback and header_printed:
                        from rich.rule import Rule
                        md_parser.flush()
                        
                    if status:
                        try: status.stop()
                        except: pass

                    final_result = from_struct(response.full_result)
                    
                    if not chunk_callback and header_printed:
                        from rich.console import Console as RichConsole
                        from ..printer import connpy_theme, get_original_stdout
                        stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout())
                        stable_console.print(Rule(style=alias))
                    break
        except Exception as e:
            # Check if it was a gRPC error that we should let handle_errors catch
            if isinstance(e, grpc.RpcError):
                raise
            printer.warning(f"Stream interrupted: {e}")
        finally:
            req_queue.put(None)
        
        if full_content:
            final_result["streamed"] = True
            
        return final_result

    @handle_errors
    def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debug=False, status=None, **overrides):
        return self._ai_chat_stream(self.stub.ask, input_text, dryrun=dryrun, chat_history=chat_history, session_id=session_id, debug=debug, status=status, **overrides)

    @handle_errors
    def build_playbook_chat(self, user_input, chat_history=None, status=None, chunk_callback=None):
        return self._ai_chat_stream(self.stub.build_playbook_chat, user_input, chat_history=chat_history, status=status, chunk_callback=chunk_callback)

    def _process_unary_stream(self, responses, status=None, chunk_callback=None):
        full_content = ""
        header_printed = False
        final_result = {"response": "", "chat_history": []}
        md_parser = None

        try:
            for response in responses:
                if response.status_update:
                    if status:
                        status.update(response.status_update)
                    continue
                
                if response.important_message:
                    if status:
                        try: status.stop()
                        except: pass
                    printer.console.print(Text.from_ansi(response.important_message))
                    if status:
                        try: status.start()
                        except: pass
                    continue

                if not response.is_final:
                    if response.text_chunk:
                        if not header_printed:
                            if status:
                                try: status.stop()
                                except: pass
                            
                            if chunk_callback:
                                header_printed = True
                            else:
                                from rich.console import Console as RichConsole
                                from rich.rule import Rule
                                from ..printer import connpy_theme, get_original_stdout, IncrementalMarkdownParser
                                stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout())
                                
                                # Print default header
                                stable_console.print(Rule("[bold engineer]AI Analysis[/bold engineer]", style="engineer"))
                                header_printed = True
                                md_parser = IncrementalMarkdownParser(console=stable_console)
                        
                        full_content += response.text_chunk
                        if chunk_callback:
                            chunk_callback(response.text_chunk)
                        elif md_parser:
                            md_parser.feed(response.text_chunk)
                    continue
                
                if response.is_final:
                    if md_parser:
                        md_parser.flush()
                        
                    if status:
                        try: status.stop()
                        except: pass

                    final_result = from_struct(response.full_result)
                    
                    if md_parser:
                        from rich.console import Console as RichConsole
                        from rich.rule import Rule
                        from ..printer import connpy_theme, get_original_stdout
                        stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout())
                        stable_console.print(Rule(style="engineer"))
                    break
        except Exception as e:
            if isinstance(e, grpc.RpcError):
                raise
            printer.warning(f"Stream interrupted: {e}")
            
        if full_content:
            final_result["streamed"] = True
            
        return final_result

    @handle_errors
    def analyze_execution_results(self, results, query=None, status=None, chunk_callback=None):
        req = connpy_pb2.AnalyzeRequest(query=query or "")
        req.results.CopyFrom(to_struct(results))
        responses = self.stub.analyze_execution_results(req)
        return self._process_unary_stream(responses, status, chunk_callback)

    @handle_errors
    def predict_execution_results(self, target_nodes, commands, status=None, chunk_callback=None):
        req = connpy_pb2.PreflightRequest(target_nodes=target_nodes, commands=commands)
        responses = self.stub.predict_execution_results(req)
        return self._process_unary_stream(responses, status, chunk_callback)

    @handle_errors
    def confirm(self, input_text, console=None):
        return self.stub.confirm(connpy_pb2.StringRequest(value=input_text)).value

    @handle_errors
    def list_sessions(self, limit=None):
        from .utils import from_value
        res = self.stub.list_sessions(Empty())
        sessions = from_value(res.data) or []
        if limit and len(sessions) > limit:
            return sessions[:limit], len(sessions)
        return sessions, len(sessions)

    @handle_errors
    def delete_session(self, session_id):
        self.stub.delete_session(connpy_pb2.StringRequest(value=session_id))

    @handle_errors
    def configure_provider(self, provider, model=None, api_key=None, auth=None):
        req = connpy_pb2.ProviderRequest(provider=provider, model=model or "", api_key=api_key or "")
        if auth:
            req.auth.CopyFrom(to_struct(auth))
        self.stub.configure_provider(req)

    @handle_errors
    def configure_mcp(self, name, url=None, enabled=True, auto_load_on_os=None, remove=False):
        req = connpy_pb2.MCPRequest(
            name=name,
            url=url or "",
            enabled=enabled,
            auto_load_on_os=auto_load_on_os or "",
            remove=remove
        )
        self.stub.configure_mcp(req)

    @handle_errors
    def list_mcp_servers(self):
        res = self.stub.list_mcp_servers(Empty())
        return from_value(res.data) or {}

    @handle_errors
    def load_session_data(self, session_id):
        return from_struct(self.stub.load_session_data(connpy_pb2.StringRequest(value=session_id)).data)

Methods

def analyze_execution_results(self, results, query=None, status=None, chunk_callback=None)
Expand source code
@handle_errors
def analyze_execution_results(self, results, query=None, status=None, chunk_callback=None):
    req = connpy_pb2.AnalyzeRequest(query=query or "")
    req.results.CopyFrom(to_struct(results))
    responses = self.stub.analyze_execution_results(req)
    return self._process_unary_stream(responses, status, chunk_callback)
def ask(self,
input_text,
dryrun=False,
chat_history=None,
session_id=None,
debug=False,
status=None,
**overrides)
Expand source code
@handle_errors
def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debug=False, status=None, **overrides):
    return self._ai_chat_stream(self.stub.ask, input_text, dryrun=dryrun, chat_history=chat_history, session_id=session_id, debug=debug, status=status, **overrides)
def build_playbook_chat(self, user_input, chat_history=None, status=None, chunk_callback=None)
Expand source code
@handle_errors
def build_playbook_chat(self, user_input, chat_history=None, status=None, chunk_callback=None):
    return self._ai_chat_stream(self.stub.build_playbook_chat, user_input, chat_history=chat_history, status=status, chunk_callback=chunk_callback)
def configure_mcp(self, name, url=None, enabled=True, auto_load_on_os=None, remove=False)
Expand source code
@handle_errors
def configure_mcp(self, name, url=None, enabled=True, auto_load_on_os=None, remove=False):
    req = connpy_pb2.MCPRequest(
        name=name,
        url=url or "",
        enabled=enabled,
        auto_load_on_os=auto_load_on_os or "",
        remove=remove
    )
    self.stub.configure_mcp(req)
def configure_provider(self, provider, model=None, api_key=None, auth=None)
Expand source code
@handle_errors
def configure_provider(self, provider, model=None, api_key=None, auth=None):
    req = connpy_pb2.ProviderRequest(provider=provider, model=model or "", api_key=api_key or "")
    if auth:
        req.auth.CopyFrom(to_struct(auth))
    self.stub.configure_provider(req)
def confirm(self, input_text, console=None)
Expand source code
@handle_errors
def confirm(self, input_text, console=None):
    return self.stub.confirm(connpy_pb2.StringRequest(value=input_text)).value
def delete_session(self, session_id)
Expand source code
@handle_errors
def delete_session(self, session_id):
    self.stub.delete_session(connpy_pb2.StringRequest(value=session_id))
def list_mcp_servers(self)
Expand source code
@handle_errors
def list_mcp_servers(self):
    res = self.stub.list_mcp_servers(Empty())
    return from_value(res.data) or {}
def list_sessions(self, limit=None)
Expand source code
@handle_errors
def list_sessions(self, limit=None):
    from .utils import from_value
    res = self.stub.list_sessions(Empty())
    sessions = from_value(res.data) or []
    if limit and len(sessions) > limit:
        return sessions[:limit], len(sessions)
    return sessions, len(sessions)
def load_session_data(self, session_id)
Expand source code
@handle_errors
def load_session_data(self, session_id):
    return from_struct(self.stub.load_session_data(connpy_pb2.StringRequest(value=session_id)).data)
def predict_execution_results(self, target_nodes, commands, status=None, chunk_callback=None)
Expand source code
@handle_errors
def predict_execution_results(self, target_nodes, commands, status=None, chunk_callback=None):
    req = connpy_pb2.PreflightRequest(target_nodes=target_nodes, commands=commands)
    responses = self.stub.predict_execution_results(req)
    return self._process_unary_stream(responses, status, chunk_callback)
class AuthClientInterceptor (token_provider)
Expand source code
class AuthClientInterceptor(grpc.UnaryUnaryClientInterceptor,
                            grpc.UnaryStreamClientInterceptor,
                            grpc.StreamUnaryClientInterceptor,
                            grpc.StreamStreamClientInterceptor):
    def __init__(self, token_provider):
        self.token_provider = token_provider

    def _add_metadata(self, client_call_details):
        token = self.token_provider()
        if not token:
            return client_call_details
        
        metadata = []
        if client_call_details.metadata:
            metadata = list(client_call_details.metadata)
            
        # Check if already present to avoid duplicates
        if not any(k.lower() == "authorization" for k, v in metadata):
            metadata.append(("authorization", f"Bearer {token}"))
            
        return _ClientCallDetails(
            method=client_call_details.method,
            timeout=client_call_details.timeout,
            metadata=metadata,
            credentials=client_call_details.credentials,
            wait_for_ready=client_call_details.wait_for_ready,
            compression=client_call_details.compression,
        )

    def intercept_unary_unary(self, continuation, client_call_details, request):
        new_details = self._add_metadata(client_call_details)
        return continuation(new_details, request)

    def intercept_unary_stream(self, continuation, client_call_details, request):
        new_details = self._add_metadata(client_call_details)
        return continuation(new_details, request)

    def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
        new_details = self._add_metadata(client_call_details)
        return continuation(new_details, request_iterator)

    def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
        new_details = self._add_metadata(client_call_details)
        return continuation(new_details, request_iterator)

Affords intercepting unary-unary invocations.

Ancestors

  • grpc.UnaryUnaryClientInterceptor
  • grpc.UnaryStreamClientInterceptor
  • grpc.StreamUnaryClientInterceptor
  • grpc.StreamStreamClientInterceptor
  • abc.ABC

Methods

def intercept_stream_stream(self, continuation, client_call_details, request_iterator)
Expand source code
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
    new_details = self._add_metadata(client_call_details)
    return continuation(new_details, request_iterator)

Intercepts a stream-stream invocation.

Args

continuation
A function that proceeds with the invocation by executing the next interceptor in chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use response_iterator = continuation(client_call_details, request_iterator) to continue with the RPC. continuation returns an object that is both a Call for the RPC and an iterator for response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status.
client_call_details
A ClientCallDetails object describing the outgoing RPC.
request_iterator
An iterator that yields request values for the RPC.

Returns

An object that is both a Call for the RPC and an iterator of response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. This object should also fulfill the Future interface, though it may not.

def intercept_stream_unary(self, continuation, client_call_details, request_iterator)
Expand source code
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
    new_details = self._add_metadata(client_call_details)
    return continuation(new_details, request_iterator)

Intercepts a stream-unary invocation asynchronously.

Args

continuation
A function that proceeds with the invocation by executing the next interceptor in chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use response_future = continuation(client_call_details, request_iterator) to continue with the RPC. continuation returns an object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.
client_call_details
A ClientCallDetails object describing the outgoing RPC.
request_iterator
An iterator that yields request values for the RPC.

Returns

An object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.

def intercept_unary_stream(self, continuation, client_call_details, request)
Expand source code
def intercept_unary_stream(self, continuation, client_call_details, request):
    new_details = self._add_metadata(client_call_details)
    return continuation(new_details, request)

Intercepts a unary-stream invocation.

Args

continuation
A function that proceeds with the invocation by executing the next interceptor in chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use response_iterator = continuation(client_call_details, request) to continue with the RPC. continuation returns an object that is both a Call for the RPC and an iterator for response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status.
client_call_details
A ClientCallDetails object describing the outgoing RPC.
request
The request value for the RPC.

Returns

An object that is both a Call for the RPC and an iterator of response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. This object should also fulfill the Future interface, though it may not.

def intercept_unary_unary(self, continuation, client_call_details, request)
Expand source code
def intercept_unary_unary(self, continuation, client_call_details, request):
    new_details = self._add_metadata(client_call_details)
    return continuation(new_details, request)

Intercepts a unary-unary invocation asynchronously.

Args

continuation
A function that proceeds with the invocation by executing the next interceptor in chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use response_future = continuation(client_call_details, request) to continue with the RPC. continuation returns an object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.
client_call_details
A ClientCallDetails object describing the outgoing RPC.
request
The request value for the RPC.

Returns

An object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.

class AuthStub (channel, remote_host)
Expand source code
class AuthStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.AuthServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def login(self, username, password):
        req = connpy_pb2.LoginRequest(username=username, password=password)
        resp = self.stub.login(req)
        return {
            "token": resp.token,
            "username": resp.username,
            "expires_at": resp.expires_at
        }

    @handle_errors
    def change_password(self, old_password, new_password):
        req = connpy_pb2.ChangePasswordRequest(old_password=old_password, new_password=new_password)
        self.stub.change_password(req)

Methods

def change_password(self, old_password, new_password)
Expand source code
@handle_errors
def change_password(self, old_password, new_password):
    req = connpy_pb2.ChangePasswordRequest(old_password=old_password, new_password=new_password)
    self.stub.change_password(req)
def login(self, username, password)
Expand source code
@handle_errors
def login(self, username, password):
    req = connpy_pb2.LoginRequest(username=username, password=password)
    resp = self.stub.login(req)
    return {
        "token": resp.token,
        "username": resp.username,
        "expires_at": resp.expires_at
    }
class ConfigStub (channel, remote_host)
Expand source code
class ConfigStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.ConfigServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def get_settings(self):
        return from_struct(self.stub.get_settings(Empty()).data)

    @handle_errors
    def update_setting(self, key, value):
        self.stub.update_setting(connpy_pb2.UpdateRequest(key=key, value=to_value(value)))

    @handle_errors
    def get_default_dir(self):
        return self.stub.get_default_dir(Empty()).value

    @handle_errors
    def set_config_folder(self, folder):
        self.stub.set_config_folder(connpy_pb2.StringRequest(value=folder))

    @handle_errors
    def encrypt_password(self, password):
        return self.stub.encrypt_password(connpy_pb2.StringRequest(value=password)).value

Methods

def encrypt_password(self, password)
Expand source code
@handle_errors
def encrypt_password(self, password):
    return self.stub.encrypt_password(connpy_pb2.StringRequest(value=password)).value
def get_default_dir(self)
Expand source code
@handle_errors
def get_default_dir(self):
    return self.stub.get_default_dir(Empty()).value
def get_settings(self)
Expand source code
@handle_errors
def get_settings(self):
    return from_struct(self.stub.get_settings(Empty()).data)
def set_config_folder(self, folder)
Expand source code
@handle_errors
def set_config_folder(self, folder):
    self.stub.set_config_folder(connpy_pb2.StringRequest(value=folder))
def update_setting(self, key, value)
Expand source code
@handle_errors
def update_setting(self, key, value):
    self.stub.update_setting(connpy_pb2.UpdateRequest(key=key, value=to_value(value)))
class ExecutionStub (channel, remote_host)
Expand source code
class ExecutionStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.ExecutionServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def run_commands(self, nodes_filter, commands, variables=None, parallel=10, timeout=10, folder=None, prompt=None, **kwargs):
        nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter)
        req = connpy_pb2.RunRequest(
            nodes=nodes_list,
            commands=commands,
            folder=folder or "",
            prompt=prompt or "",
            parallel=parallel,
            timeout=timeout,
            name=kwargs.get("name", "")
        )
        if variables is not None:
            req.vars.CopyFrom(to_struct(variables))
            
        final_results = {}
        on_complete = kwargs.get("on_node_complete")
        
        for response in self.stub.run_commands(req):
            if on_complete:
                on_complete(response.unique_id, response.output, response.status)
            final_results[response.unique_id] = {
                "output": response.output,
                "status": response.status
            }
                
        return final_results

    @handle_errors
    def test_commands(self, nodes_filter, commands, expected, variables=None, parallel=10, timeout=10, prompt=None, **kwargs):
        nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter)
        req = connpy_pb2.TestRequest(
            nodes=nodes_list,
            commands=commands,
            expected=expected if isinstance(expected, list) else [expected],
            folder=kwargs.get("folder", ""),
            prompt=prompt or "",
            parallel=parallel,
            timeout=timeout,
            name=kwargs.get("name", "")
        )
        if variables is not None:
            req.vars.CopyFrom(to_struct(variables))
            
        final_results = {}
        on_complete = kwargs.get("on_node_complete")
        
        for response in self.stub.test_commands(req):
            result_dict = from_struct(response.test_result) if response.HasField("test_result") else {}
            if on_complete:
                on_complete(response.unique_id, response.output, response.status, result_dict)
            final_results[response.unique_id] = result_dict
                
        return final_results

    @handle_errors
    def run_cli_script(self, nodes_filter, script_path, parallel=10):
        req = connpy_pb2.ScriptRequest(param1=nodes_filter, param2=script_path, parallel=parallel)
        return from_struct(self.stub.run_cli_script(req).data)

Methods

def run_cli_script(self, nodes_filter, script_path, parallel=10)
Expand source code
@handle_errors
def run_cli_script(self, nodes_filter, script_path, parallel=10):
    req = connpy_pb2.ScriptRequest(param1=nodes_filter, param2=script_path, parallel=parallel)
    return from_struct(self.stub.run_cli_script(req).data)
def run_commands(self,
nodes_filter,
commands,
variables=None,
parallel=10,
timeout=10,
folder=None,
prompt=None,
**kwargs)
Expand source code
@handle_errors
def run_commands(self, nodes_filter, commands, variables=None, parallel=10, timeout=10, folder=None, prompt=None, **kwargs):
    nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter)
    req = connpy_pb2.RunRequest(
        nodes=nodes_list,
        commands=commands,
        folder=folder or "",
        prompt=prompt or "",
        parallel=parallel,
        timeout=timeout,
        name=kwargs.get("name", "")
    )
    if variables is not None:
        req.vars.CopyFrom(to_struct(variables))
        
    final_results = {}
    on_complete = kwargs.get("on_node_complete")
    
    for response in self.stub.run_commands(req):
        if on_complete:
            on_complete(response.unique_id, response.output, response.status)
        final_results[response.unique_id] = {
            "output": response.output,
            "status": response.status
        }
            
    return final_results
def test_commands(self,
nodes_filter,
commands,
expected,
variables=None,
parallel=10,
timeout=10,
prompt=None,
**kwargs)
Expand source code
@handle_errors
def test_commands(self, nodes_filter, commands, expected, variables=None, parallel=10, timeout=10, prompt=None, **kwargs):
    nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter)
    req = connpy_pb2.TestRequest(
        nodes=nodes_list,
        commands=commands,
        expected=expected if isinstance(expected, list) else [expected],
        folder=kwargs.get("folder", ""),
        prompt=prompt or "",
        parallel=parallel,
        timeout=timeout,
        name=kwargs.get("name", "")
    )
    if variables is not None:
        req.vars.CopyFrom(to_struct(variables))
        
    final_results = {}
    on_complete = kwargs.get("on_node_complete")
    
    for response in self.stub.test_commands(req):
        result_dict = from_struct(response.test_result) if response.HasField("test_result") else {}
        if on_complete:
            on_complete(response.unique_id, response.output, response.status, result_dict)
        final_results[response.unique_id] = result_dict
            
    return final_results
class ImportExportStub (channel, remote_host)
Expand source code
class ImportExportStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.ImportExportServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def export_to_file(self, file_path, folders=None):
        req = connpy_pb2.ExportRequest(file_path=file_path, folders=folders or [])
        self.stub.export_to_file(req)

    @handle_errors
    def import_from_file(self, file_path):
        with open(file_path, "r") as f:
            content = f.read()
        # Marker to tell the server this is content, not a path
        marker_content = f"---YAML---\n{content}"
        self.stub.import_from_file(connpy_pb2.StringRequest(value=marker_content))

    @handle_errors
    def set_reserved_names(self, names):
        self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))

Methods

def export_to_file(self, file_path, folders=None)
Expand source code
@handle_errors
def export_to_file(self, file_path, folders=None):
    req = connpy_pb2.ExportRequest(file_path=file_path, folders=folders or [])
    self.stub.export_to_file(req)
def import_from_file(self, file_path)
Expand source code
@handle_errors
def import_from_file(self, file_path):
    with open(file_path, "r") as f:
        content = f.read()
    # Marker to tell the server this is content, not a path
    marker_content = f"---YAML---\n{content}"
    self.stub.import_from_file(connpy_pb2.StringRequest(value=marker_content))
def set_reserved_names(self, names)
Expand source code
@handle_errors
def set_reserved_names(self, names):
    self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))
class NodeStub (channel, remote_host, config=None)
Expand source code
class NodeStub:
    def __init__(self, channel, remote_host, config=None):
        self.stub = connpy_pb2_grpc.NodeServiceStub(channel)
        self.remote_host = remote_host
        self.config = config

    def _handle_remote_copilot(self, res, request_queue, response_queue, client_buffer_bytes, pause_generator, resume_generator, old_tty):
        import json, asyncio, termios, sys, tty, queue
        from ..core import copilot_terminal_mode
        from . import connpy_pb2

        pause_generator()
        
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
        
        node_info = json.loads(res.copilot_node_info_json) if res.copilot_node_info_json else {}
        blocks = node_info.get("context_blocks", [])

        interface = CopilotInterface(
            self.config, 
            history=getattr(self, 'copilot_history', None),
            session_state=getattr(self, 'copilot_state', None)
        )
        self.copilot_history = interface.history
        self.copilot_state = interface.session_state

        async def on_ai_call_remote(active_buffer, question, chunk_callback, merged_node_info):
            # Send request to server
            request_queue.put(connpy_pb2.InteractRequest(
                copilot_question=question, 
                copilot_context_buffer=active_buffer,
                copilot_node_info_json=json.dumps(merged_node_info)
            ))
            # Wait for chunks from server
            while True:
                try:
                    chunk_res = response_queue.get(timeout=0.1)
                    if chunk_res is None: return {"error": "Server disconnected"}
                    if chunk_res.copilot_stream_chunk:
                        chunk_callback(chunk_res.copilot_stream_chunk)
                    elif chunk_res.copilot_response_json:
                        return json.loads(chunk_res.copilot_response_json)
                except queue.Empty:
                    await asyncio.sleep(0.05)

        # Wrap in async loop
        async def run_remote_copilot():
            while True:
                action, commands, custom_cmd = await interface.run_session(
                    raw_bytes=bytes(client_buffer_bytes),
                    node_info=node_info,
                    on_ai_call=on_ai_call_remote,
                    blocks=blocks
                )
                
                if action == "continue":
                    # Send continue signal to server to loop back for another question
                    request_queue.put(connpy_pb2.InteractRequest(copilot_action="continue"))
                    continue
                
                return action, commands, custom_cmd
        
        with copilot_terminal_mode():
            action, commands, custom_cmd = asyncio.run(run_remote_copilot())
        
        print("\033[2m Returning to session...\033[0m", flush=True)
        # Prepare final action for server
        action_sent = "cancel"
        if action == "send_all" and commands:
            # In remote mode, send the selected commands as a custom block
            # so the server executes exactly what the user picked (e.g., selection '1')
            action_sent = f"custom:{chr(10).join(commands)}"
        elif action == "custom" and custom_cmd:
            action_sent = f"custom:{chr(10).join(custom_cmd)}"
        request_queue.put(connpy_pb2.InteractRequest(copilot_action=action_sent))
        resume_generator()
        tty.setraw(sys.stdin.fileno())

    @handle_errors
    def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
        import sys
        import select
        import tty
        import termios
        import queue
        import os
        import threading
        
        request_queue = queue.Queue()
        client_buffer_bytes = bytearray()
        pause_stdin = [False]
        wake_r, wake_w = os.pipe()

        def pause_generator():
            pause_stdin[0] = True
            os.write(wake_w, b'\x00')

        def resume_generator():
            pause_stdin[0] = False

        def request_generator():
            cols, rows = 80, 24
            try:
                size = os.get_terminal_size()
                cols, rows = size.columns, size.lines
            except OSError:
                pass
                
            yield connpy_pb2.InteractRequest(
                id=unique_id, sftp=sftp, debug=debug, cols=cols, rows=rows
            )
            
            while True:
                try:
                    while True:
                        req = request_queue.get_nowait()
                        if req is None:
                            return
                        yield req
                except queue.Empty:
                    pass

                if pause_stdin[0]:
                    import time
                    time.sleep(0.05)
                    continue

                r, _, _ = select.select([sys.stdin.fileno(), wake_r], [], [], 0.05)
                if wake_r in r:
                    os.read(wake_r, 1)
                    continue
                if sys.stdin.fileno() in r and not pause_stdin[0]:
                    try:
                        data = os.read(sys.stdin.fileno(), 1024)
                        if not data:
                            break
                        yield connpy_pb2.InteractRequest(stdin_data=data)
                    except OSError:
                        break

        # Fetch node details for the connection message
        try:
            node_details = self.get_node_details(unique_id)
            host = node_details.get("host", "unknown")
            port = str(node_details.get("port", ""))
            protocol = "sftp" if sftp else node_details.get("protocol", "ssh")
            port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else ""
            conn_msg = f"Connected to {unique_id} at {host}{port_str} via: {protocol}"
        except Exception:
            conn_msg = f"Connected to {unique_id}"

        old_tty = termios.tcgetattr(sys.stdin)
        try:
            import time
            tty.setraw(sys.stdin.fileno())
            response_iterator = self.stub.interact_node(request_generator())
            
            import queue
            response_queue = queue.Queue()
            
            def response_consumer():
                try:
                    for r in response_iterator:
                        response_queue.put(r)
                except Exception:
                    pass
                response_queue.put(None)
                
            t_consumer = threading.Thread(target=response_consumer, daemon=True)
            t_consumer.start()
            
            # First phase: Wait for connection status, print early data
            try:
                while True:
                    res = response_queue.get()
                    if res is None:
                        return
                    if res.stdout_data:
                        data = res.stdout_data
                        if debug:
                            data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'')
                        os.write(sys.stdout.fileno(), data)
                    
                    if res.success:
                        # Connection established on server, show success message
                        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                        printer.success(conn_msg)
                        pause_stdin[0] = False
                        tty.setraw(sys.stdin.fileno())
                        break
                    
                    if res.error_message:
                        # Connection failed on server
                        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                        printer.error(f"Connection failed: {res.error_message}")
                        return
            except queue.Empty:
                return
            
            # Second phase: Stream active session
            # Clear screen filter is only applied before success (Phase 1).
            # Once the user has a prompt, Ctrl+L must work normally.
            while True:
                res = response_queue.get()
                if res is None:
                    break
                if res.copilot_prompt:
                    self._handle_remote_copilot(
                        res, request_queue, response_queue, 
                        client_buffer_bytes, 
                        pause_generator, resume_generator, old_tty
                    )
                    continue

                if res.stdout_data:
                    os.write(sys.stdout.fileno(), res.stdout_data)
                    client_buffer_bytes.extend(res.stdout_data)
        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
            os.close(wake_r)
            os.close(wake_w)

    @handle_errors
    def connect_dynamic(self, connection_params, debug=False):
        import sys
        import select
        import tty
        import termios
        import queue
        import os
        import json
        
        params_json = json.dumps(connection_params)
        request_queue = queue.Queue()
        client_buffer_bytes = bytearray()
        pause_stdin = [False]
        wake_r, wake_w = os.pipe()

        def pause_generator():
            pause_stdin[0] = True
            os.write(wake_w, b'\x00')

        def resume_generator():
            pause_stdin[0] = False
        
        def request_generator():
            cols, rows = 80, 24
            try:
                size = os.get_terminal_size()
                cols, rows = size.columns, size.lines
            except OSError:
                pass
                
            yield connpy_pb2.InteractRequest(
                id="dynamic", debug=debug, cols=cols, rows=rows,
                connection_params_json=params_json
            )
            
            while True:
                try:
                    while True:
                        req = request_queue.get_nowait()
                        if req is None:
                            return
                        yield req
                except queue.Empty:
                    pass

                if pause_stdin[0]:
                    import time
                    time.sleep(0.05)
                    continue

                r, _, _ = select.select([sys.stdin.fileno(), wake_r], [], [], 0.05)
                if wake_r in r:
                    os.read(wake_r, 1)
                    continue
                if sys.stdin.fileno() in r and not pause_stdin[0]:
                    try:
                        data = os.read(sys.stdin.fileno(), 1024)
                        if not data:
                            break
                        yield connpy_pb2.InteractRequest(stdin_data=data)
                    except OSError:
                        break

        # Prepare connection message
        try:
            node_name = connection_params.get("name", "dynamic@remote")
            host = connection_params.get("host", "dynamic")
            port = str(connection_params.get("port", ""))
            protocol = connection_params.get("protocol", "ssh")
            port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else ""
            conn_msg = f"Connected to {node_name} at {host}{port_str} via: {protocol}"
        except Exception:
            node_name = connection_params.get("name", "dynamic@remote") if isinstance(connection_params, dict) else "dynamic@remote"
            conn_msg = f"Connected to {node_name}"

        old_tty = termios.tcgetattr(sys.stdin)
        try:
            import time
            tty.setraw(sys.stdin.fileno())
            response_iterator = self.stub.interact_node(request_generator())
            
            import queue
            response_queue = queue.Queue()
            
            def response_consumer():
                try:
                    for r in response_iterator:
                        response_queue.put(r)
                except Exception:
                    pass
                response_queue.put(None)
                
            t_consumer = threading.Thread(target=response_consumer, daemon=True)
            t_consumer.start()
            
            # First phase: Wait for connection status, print early data
            try:
                while True:
                    res = response_queue.get()
                    if res is None:
                        return
                    if res.stdout_data:
                        data = res.stdout_data
                        if debug:
                            data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'')
                        os.write(sys.stdout.fileno(), data)
                    
                    if res.success:
                        # Connection established on server, show success message
                        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                        printer.success(conn_msg)
                        pause_stdin[0] = False
                        tty.setraw(sys.stdin.fileno())
                        break
                    
                    if res.error_message:
                        # Connection failed on server
                        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                        printer.error(f"Connection failed: {res.error_message}")
                        return
            except queue.Empty:
                return
                
            # Second phase: Stream active session
            while True:
                res = response_queue.get()
                if res is None:
                    break
                if res.copilot_prompt:
                    self._handle_remote_copilot(
                        res, request_queue, response_queue, 
                        client_buffer_bytes, 
                        pause_generator, resume_generator, old_tty
                    )
                    continue

                if res.stdout_data:
                    os.write(sys.stdout.fileno(), res.stdout_data)
                    client_buffer_bytes.extend(res.stdout_data)
        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
            os.close(wake_r)
            os.close(wake_w)

    @MethodHook
    @handle_errors
    def list_nodes(self, filter_str=None, format_str=None):
        req = connpy_pb2.FilterRequest(filter_str=filter_str or "", format_str=format_str or "")
        return from_value(self.stub.list_nodes(req).data) or []

    @MethodHook
    @handle_errors
    def list_folders(self, filter_str=None):
        req = connpy_pb2.FilterRequest(filter_str=filter_str or "")
        return from_value(self.stub.list_folders(req).data) or []

    @handle_errors
    def get_node_details(self, unique_id):
        return from_struct(self.stub.get_node_details(connpy_pb2.IdRequest(id=unique_id)).data)

    @handle_errors
    def explode_unique(self, unique_id):
        return from_value(self.stub.explode_unique(connpy_pb2.IdRequest(id=unique_id)).data)

    @handle_errors
    def validate_parent_folder(self, unique_id):
        self.stub.validate_parent_folder(connpy_pb2.IdRequest(id=unique_id))

    @handle_errors
    def generate_cache(self, nodes=None, folders=None, profiles=None):
        # 1. Update remote cache on server
        self.stub.generate_cache(Empty())
        
        # 2. Update local fzf/text cache files
        # If no data provided, we fetch it all from remote to sync local files
        if nodes is None and folders is None and profiles is None:
            nodes = self.list_nodes()
            folders = self.list_folders()
            # We don't have direct access to ProfileStub here, but usually 
            # node cache is what matters for fzf. We'll fetch profiles if we can.
            # For now, let's sync what we have.
            
        if nodes is not None or folders is not None or profiles is not None:
            self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)

    def _trigger_local_cache_sync(self):
        """Helper to fetch remote data and update local fzf cache files after a change."""
        try:
            nodes = self.list_nodes()
            folders = self.list_folders()
            self.generate_cache(nodes=nodes, folders=folders)
        except Exception:
            # Failure to sync cache shouldn't break the main operation's success feedback
            pass

    @handle_errors
    def add_node(self, unique_id, data, is_folder=False):
        req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=is_folder)
        self.stub.add_node(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def update_node(self, unique_id, data, save=True):
        req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=False)
        self.stub.update_node(req)
        if save:
            self._trigger_local_cache_sync()

    @handle_errors
    def delete_node(self, unique_id, is_folder=False, save=True):
        req = connpy_pb2.DeleteRequest(id=unique_id, is_folder=is_folder)
        self.stub.delete_node(req)
        if save:
            self._trigger_local_cache_sync()

    @handle_errors
    def move_node(self, src_id, dst_id, copy=False):
        req = connpy_pb2.MoveRequest(src_id=src_id, dst_id=dst_id, copy=copy)
        self.stub.move_node(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def bulk_add(self, ids, hosts, common_data):
        req = connpy_pb2.BulkRequest(ids=ids, hosts=hosts, common_data=to_struct(common_data))
        self.stub.bulk_add(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def set_reserved_names(self, names):
        self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))
        self._trigger_local_cache_sync()

    @handle_errors
    def full_replace(self, connections, profiles):
        req = connpy_pb2.FullReplaceRequest(
            connections=to_struct(connections),
            profiles=to_struct(profiles)
        )
        self.stub.full_replace(req)
        self._trigger_local_cache_sync()

    @handle_errors
    def get_inventory(self):
        resp = self.stub.get_inventory(Empty())
        return {
            "connections": from_struct(resp.connections),
            "profiles": from_struct(resp.profiles)
        }

Methods

def add_node(self, unique_id, data, is_folder=False)
Expand source code
@handle_errors
def add_node(self, unique_id, data, is_folder=False):
    req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=is_folder)
    self.stub.add_node(req)
    self._trigger_local_cache_sync()
def bulk_add(self, ids, hosts, common_data)
Expand source code
@handle_errors
def bulk_add(self, ids, hosts, common_data):
    req = connpy_pb2.BulkRequest(ids=ids, hosts=hosts, common_data=to_struct(common_data))
    self.stub.bulk_add(req)
    self._trigger_local_cache_sync()
def connect_dynamic(self, connection_params, debug=False)
Expand source code
@handle_errors
def connect_dynamic(self, connection_params, debug=False):
    import sys
    import select
    import tty
    import termios
    import queue
    import os
    import json
    
    params_json = json.dumps(connection_params)
    request_queue = queue.Queue()
    client_buffer_bytes = bytearray()
    pause_stdin = [False]
    wake_r, wake_w = os.pipe()

    def pause_generator():
        pause_stdin[0] = True
        os.write(wake_w, b'\x00')

    def resume_generator():
        pause_stdin[0] = False
    
    def request_generator():
        cols, rows = 80, 24
        try:
            size = os.get_terminal_size()
            cols, rows = size.columns, size.lines
        except OSError:
            pass
            
        yield connpy_pb2.InteractRequest(
            id="dynamic", debug=debug, cols=cols, rows=rows,
            connection_params_json=params_json
        )
        
        while True:
            try:
                while True:
                    req = request_queue.get_nowait()
                    if req is None:
                        return
                    yield req
            except queue.Empty:
                pass

            if pause_stdin[0]:
                import time
                time.sleep(0.05)
                continue

            r, _, _ = select.select([sys.stdin.fileno(), wake_r], [], [], 0.05)
            if wake_r in r:
                os.read(wake_r, 1)
                continue
            if sys.stdin.fileno() in r and not pause_stdin[0]:
                try:
                    data = os.read(sys.stdin.fileno(), 1024)
                    if not data:
                        break
                    yield connpy_pb2.InteractRequest(stdin_data=data)
                except OSError:
                    break

    # Prepare connection message
    try:
        node_name = connection_params.get("name", "dynamic@remote")
        host = connection_params.get("host", "dynamic")
        port = str(connection_params.get("port", ""))
        protocol = connection_params.get("protocol", "ssh")
        port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else ""
        conn_msg = f"Connected to {node_name} at {host}{port_str} via: {protocol}"
    except Exception:
        node_name = connection_params.get("name", "dynamic@remote") if isinstance(connection_params, dict) else "dynamic@remote"
        conn_msg = f"Connected to {node_name}"

    old_tty = termios.tcgetattr(sys.stdin)
    try:
        import time
        tty.setraw(sys.stdin.fileno())
        response_iterator = self.stub.interact_node(request_generator())
        
        import queue
        response_queue = queue.Queue()
        
        def response_consumer():
            try:
                for r in response_iterator:
                    response_queue.put(r)
            except Exception:
                pass
            response_queue.put(None)
            
        t_consumer = threading.Thread(target=response_consumer, daemon=True)
        t_consumer.start()
        
        # First phase: Wait for connection status, print early data
        try:
            while True:
                res = response_queue.get()
                if res is None:
                    return
                if res.stdout_data:
                    data = res.stdout_data
                    if debug:
                        data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'')
                    os.write(sys.stdout.fileno(), data)
                
                if res.success:
                    # Connection established on server, show success message
                    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                    printer.success(conn_msg)
                    pause_stdin[0] = False
                    tty.setraw(sys.stdin.fileno())
                    break
                
                if res.error_message:
                    # Connection failed on server
                    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                    printer.error(f"Connection failed: {res.error_message}")
                    return
        except queue.Empty:
            return
            
        # Second phase: Stream active session
        while True:
            res = response_queue.get()
            if res is None:
                break
            if res.copilot_prompt:
                self._handle_remote_copilot(
                    res, request_queue, response_queue, 
                    client_buffer_bytes, 
                    pause_generator, resume_generator, old_tty
                )
                continue

            if res.stdout_data:
                os.write(sys.stdout.fileno(), res.stdout_data)
                client_buffer_bytes.extend(res.stdout_data)
    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
        os.close(wake_r)
        os.close(wake_w)
def connect_node(self, unique_id, sftp=False, debug=False, logger=None)
Expand source code
@handle_errors
def connect_node(self, unique_id, sftp=False, debug=False, logger=None):
    import sys
    import select
    import tty
    import termios
    import queue
    import os
    import threading
    
    request_queue = queue.Queue()
    client_buffer_bytes = bytearray()
    pause_stdin = [False]
    wake_r, wake_w = os.pipe()

    def pause_generator():
        pause_stdin[0] = True
        os.write(wake_w, b'\x00')

    def resume_generator():
        pause_stdin[0] = False

    def request_generator():
        cols, rows = 80, 24
        try:
            size = os.get_terminal_size()
            cols, rows = size.columns, size.lines
        except OSError:
            pass
            
        yield connpy_pb2.InteractRequest(
            id=unique_id, sftp=sftp, debug=debug, cols=cols, rows=rows
        )
        
        while True:
            try:
                while True:
                    req = request_queue.get_nowait()
                    if req is None:
                        return
                    yield req
            except queue.Empty:
                pass

            if pause_stdin[0]:
                import time
                time.sleep(0.05)
                continue

            r, _, _ = select.select([sys.stdin.fileno(), wake_r], [], [], 0.05)
            if wake_r in r:
                os.read(wake_r, 1)
                continue
            if sys.stdin.fileno() in r and not pause_stdin[0]:
                try:
                    data = os.read(sys.stdin.fileno(), 1024)
                    if not data:
                        break
                    yield connpy_pb2.InteractRequest(stdin_data=data)
                except OSError:
                    break

    # Fetch node details for the connection message
    try:
        node_details = self.get_node_details(unique_id)
        host = node_details.get("host", "unknown")
        port = str(node_details.get("port", ""))
        protocol = "sftp" if sftp else node_details.get("protocol", "ssh")
        port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else ""
        conn_msg = f"Connected to {unique_id} at {host}{port_str} via: {protocol}"
    except Exception:
        conn_msg = f"Connected to {unique_id}"

    old_tty = termios.tcgetattr(sys.stdin)
    try:
        import time
        tty.setraw(sys.stdin.fileno())
        response_iterator = self.stub.interact_node(request_generator())
        
        import queue
        response_queue = queue.Queue()
        
        def response_consumer():
            try:
                for r in response_iterator:
                    response_queue.put(r)
            except Exception:
                pass
            response_queue.put(None)
            
        t_consumer = threading.Thread(target=response_consumer, daemon=True)
        t_consumer.start()
        
        # First phase: Wait for connection status, print early data
        try:
            while True:
                res = response_queue.get()
                if res is None:
                    return
                if res.stdout_data:
                    data = res.stdout_data
                    if debug:
                        data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'')
                    os.write(sys.stdout.fileno(), data)
                
                if res.success:
                    # Connection established on server, show success message
                    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                    printer.success(conn_msg)
                    pause_stdin[0] = False
                    tty.setraw(sys.stdin.fileno())
                    break
                
                if res.error_message:
                    # Connection failed on server
                    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
                    printer.error(f"Connection failed: {res.error_message}")
                    return
        except queue.Empty:
            return
        
        # Second phase: Stream active session
        # Clear screen filter is only applied before success (Phase 1).
        # Once the user has a prompt, Ctrl+L must work normally.
        while True:
            res = response_queue.get()
            if res is None:
                break
            if res.copilot_prompt:
                self._handle_remote_copilot(
                    res, request_queue, response_queue, 
                    client_buffer_bytes, 
                    pause_generator, resume_generator, old_tty
                )
                continue

            if res.stdout_data:
                os.write(sys.stdout.fileno(), res.stdout_data)
                client_buffer_bytes.extend(res.stdout_data)
    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
        os.close(wake_r)
        os.close(wake_w)
def delete_node(self, unique_id, is_folder=False, save=True)
Expand source code
@handle_errors
def delete_node(self, unique_id, is_folder=False, save=True):
    req = connpy_pb2.DeleteRequest(id=unique_id, is_folder=is_folder)
    self.stub.delete_node(req)
    if save:
        self._trigger_local_cache_sync()
def explode_unique(self, unique_id)
Expand source code
@handle_errors
def explode_unique(self, unique_id):
    return from_value(self.stub.explode_unique(connpy_pb2.IdRequest(id=unique_id)).data)
def full_replace(self, connections, profiles)
Expand source code
@handle_errors
def full_replace(self, connections, profiles):
    req = connpy_pb2.FullReplaceRequest(
        connections=to_struct(connections),
        profiles=to_struct(profiles)
    )
    self.stub.full_replace(req)
    self._trigger_local_cache_sync()
def generate_cache(self, nodes=None, folders=None, profiles=None)
Expand source code
@handle_errors
def generate_cache(self, nodes=None, folders=None, profiles=None):
    # 1. Update remote cache on server
    self.stub.generate_cache(Empty())
    
    # 2. Update local fzf/text cache files
    # If no data provided, we fetch it all from remote to sync local files
    if nodes is None and folders is None and profiles is None:
        nodes = self.list_nodes()
        folders = self.list_folders()
        # We don't have direct access to ProfileStub here, but usually 
        # node cache is what matters for fzf. We'll fetch profiles if we can.
        # For now, let's sync what we have.
        
    if nodes is not None or folders is not None or profiles is not None:
        self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)
def get_inventory(self)
Expand source code
@handle_errors
def get_inventory(self):
    resp = self.stub.get_inventory(Empty())
    return {
        "connections": from_struct(resp.connections),
        "profiles": from_struct(resp.profiles)
    }
def get_node_details(self, unique_id)
Expand source code
@handle_errors
def get_node_details(self, unique_id):
    return from_struct(self.stub.get_node_details(connpy_pb2.IdRequest(id=unique_id)).data)
def list_folders(self, filter_str=None)
Expand source code
@MethodHook
@handle_errors
def list_folders(self, filter_str=None):
    req = connpy_pb2.FilterRequest(filter_str=filter_str or "")
    return from_value(self.stub.list_folders(req).data) or []
def list_nodes(self, filter_str=None, format_str=None)
Expand source code
@MethodHook
@handle_errors
def list_nodes(self, filter_str=None, format_str=None):
    req = connpy_pb2.FilterRequest(filter_str=filter_str or "", format_str=format_str or "")
    return from_value(self.stub.list_nodes(req).data) or []
def move_node(self, src_id, dst_id, copy=False)
Expand source code
@handle_errors
def move_node(self, src_id, dst_id, copy=False):
    req = connpy_pb2.MoveRequest(src_id=src_id, dst_id=dst_id, copy=copy)
    self.stub.move_node(req)
    self._trigger_local_cache_sync()
def set_reserved_names(self, names)
Expand source code
@handle_errors
def set_reserved_names(self, names):
    self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))
    self._trigger_local_cache_sync()
def update_node(self, unique_id, data, save=True)
Expand source code
@handle_errors
def update_node(self, unique_id, data, save=True):
    req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=False)
    self.stub.update_node(req)
    if save:
        self._trigger_local_cache_sync()
def validate_parent_folder(self, unique_id)
Expand source code
@handle_errors
def validate_parent_folder(self, unique_id):
    self.stub.validate_parent_folder(connpy_pb2.IdRequest(id=unique_id))
class PluginStub (channel, remote_host)
Expand source code
class PluginStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.PluginServiceStub(channel)
        self.remote_stub = remote_plugin_pb2_grpc.RemotePluginServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def list_plugins(self):
        return from_value(self.stub.list_plugins(Empty()).data)

    @handle_errors
    def add_plugin(self, name, source_file, update=False):
        # Read the local file content to send it to the server
        with open(source_file, "r") as f:
            content = f.read()
        
        # Use source_file as a marker for "content-inside"
        marker_content = f"---CONTENT---\n{content}"
        req = connpy_pb2.PluginRequest(name=name, source_file=marker_content, update=update)
        self.stub.add_plugin(req)

    @handle_errors
    def delete_plugin(self, name):
        self.stub.delete_plugin(connpy_pb2.IdRequest(id=name))

    @handle_errors
    def enable_plugin(self, name):
        self.stub.enable_plugin(connpy_pb2.IdRequest(id=name))

    @handle_errors
    def disable_plugin(self, name):
        self.stub.disable_plugin(connpy_pb2.IdRequest(id=name))

    @handle_errors
    def get_plugin_source(self, name):
        resp = self.remote_stub.get_plugin_source(remote_plugin_pb2.IdRequest(id=name))
        return resp.value

    @handle_errors
    def invoke_plugin(self, name, args_namespace):
        import json
        args_dict = {k: v for k, v in vars(args_namespace).items()
                     if isinstance(v, (str, int, float, bool, list, type(None)))}
        if hasattr(args_namespace, "func") and hasattr(args_namespace.func, "__name__"):
            args_dict["__func_name__"] = args_namespace.func.__name__
            
        req = remote_plugin_pb2.PluginInvokeRequest(name=name, args_json=json.dumps(args_dict))
        for chunk in self.remote_stub.invoke_plugin(req):
            yield chunk.text

Methods

def add_plugin(self, name, source_file, update=False)
Expand source code
@handle_errors
def add_plugin(self, name, source_file, update=False):
    # Read the local file content to send it to the server
    with open(source_file, "r") as f:
        content = f.read()
    
    # Use source_file as a marker for "content-inside"
    marker_content = f"---CONTENT---\n{content}"
    req = connpy_pb2.PluginRequest(name=name, source_file=marker_content, update=update)
    self.stub.add_plugin(req)
def delete_plugin(self, name)
Expand source code
@handle_errors
def delete_plugin(self, name):
    self.stub.delete_plugin(connpy_pb2.IdRequest(id=name))
def disable_plugin(self, name)
Expand source code
@handle_errors
def disable_plugin(self, name):
    self.stub.disable_plugin(connpy_pb2.IdRequest(id=name))
def enable_plugin(self, name)
Expand source code
@handle_errors
def enable_plugin(self, name):
    self.stub.enable_plugin(connpy_pb2.IdRequest(id=name))
def get_plugin_source(self, name)
Expand source code
@handle_errors
def get_plugin_source(self, name):
    resp = self.remote_stub.get_plugin_source(remote_plugin_pb2.IdRequest(id=name))
    return resp.value
def invoke_plugin(self, name, args_namespace)
Expand source code
@handle_errors
def invoke_plugin(self, name, args_namespace):
    import json
    args_dict = {k: v for k, v in vars(args_namespace).items()
                 if isinstance(v, (str, int, float, bool, list, type(None)))}
    if hasattr(args_namespace, "func") and hasattr(args_namespace.func, "__name__"):
        args_dict["__func_name__"] = args_namespace.func.__name__
        
    req = remote_plugin_pb2.PluginInvokeRequest(name=name, args_json=json.dumps(args_dict))
    for chunk in self.remote_stub.invoke_plugin(req):
        yield chunk.text
def list_plugins(self)
Expand source code
@handle_errors
def list_plugins(self):
    return from_value(self.stub.list_plugins(Empty()).data)
class ProfileStub (channel, remote_host, node_stub=None)
Expand source code
class ProfileStub:
    def __init__(self, channel, remote_host, node_stub=None):
        self.stub = connpy_pb2_grpc.ProfileServiceStub(channel)
        self.remote_host = remote_host
        self.node_stub = node_stub

    @handle_errors
    def list_profiles(self, filter_str=None):
        req = connpy_pb2.FilterRequest(filter_str=filter_str or "")
        return from_value(self.stub.list_profiles(req).data) or []

    @handle_errors
    def get_profile(self, name, resolve=True):
        req = connpy_pb2.ProfileRequest(name=name, resolve=resolve)
        return from_struct(self.stub.get_profile(req).data)

    @handle_errors
    def add_profile(self, name, data):
        req = connpy_pb2.NodeRequest(id=name, data=to_struct(data))
        self.stub.add_profile(req)
        if self.node_stub:
            self.node_stub._trigger_local_cache_sync()

    @handle_errors
    def resolve_node_data(self, node_data):
        req = connpy_pb2.StructRequest(data=to_struct(node_data))
        return from_struct(self.stub.resolve_node_data(req).data)

    @handle_errors
    def delete_profile(self, name):
        req = connpy_pb2.IdRequest(id=name)
        self.stub.delete_profile(req)
        if self.node_stub:
            self.node_stub._trigger_local_cache_sync()

    @handle_errors
    def update_profile(self, name, data):
        req = connpy_pb2.NodeRequest(id=name, data=to_struct(data))
        self.stub.update_profile(req)
        if self.node_stub:
            self.node_stub._trigger_local_cache_sync()

Methods

def add_profile(self, name, data)
Expand source code
@handle_errors
def add_profile(self, name, data):
    req = connpy_pb2.NodeRequest(id=name, data=to_struct(data))
    self.stub.add_profile(req)
    if self.node_stub:
        self.node_stub._trigger_local_cache_sync()
def delete_profile(self, name)
Expand source code
@handle_errors
def delete_profile(self, name):
    req = connpy_pb2.IdRequest(id=name)
    self.stub.delete_profile(req)
    if self.node_stub:
        self.node_stub._trigger_local_cache_sync()
def get_profile(self, name, resolve=True)
Expand source code
@handle_errors
def get_profile(self, name, resolve=True):
    req = connpy_pb2.ProfileRequest(name=name, resolve=resolve)
    return from_struct(self.stub.get_profile(req).data)
def list_profiles(self, filter_str=None)
Expand source code
@handle_errors
def list_profiles(self, filter_str=None):
    req = connpy_pb2.FilterRequest(filter_str=filter_str or "")
    return from_value(self.stub.list_profiles(req).data) or []
def resolve_node_data(self, node_data)
Expand source code
@handle_errors
def resolve_node_data(self, node_data):
    req = connpy_pb2.StructRequest(data=to_struct(node_data))
    return from_struct(self.stub.resolve_node_data(req).data)
def update_profile(self, name, data)
Expand source code
@handle_errors
def update_profile(self, name, data):
    req = connpy_pb2.NodeRequest(id=name, data=to_struct(data))
    self.stub.update_profile(req)
    if self.node_stub:
        self.node_stub._trigger_local_cache_sync()
class SystemStub (channel, remote_host)
Expand source code
class SystemStub:
    def __init__(self, channel, remote_host):
        self.stub = connpy_pb2_grpc.SystemServiceStub(channel)
        self.remote_host = remote_host

    @handle_errors
    def start_api(self, port=None):
        self.stub.start_api(connpy_pb2.IntRequest(value=port or 8048))

    @handle_errors
    def debug_api(self, port=None):
        self.stub.debug_api(connpy_pb2.IntRequest(value=port or 8048))

    @handle_errors
    def stop_api(self):
        self.stub.stop_api(Empty())

    @handle_errors
    def restart_api(self, port=None):
        self.stub.restart_api(connpy_pb2.IntRequest(value=port or 8048))

    @handle_errors
    def get_api_status(self):
        return self.stub.get_api_status(Empty()).value

Methods

def debug_api(self, port=None)
Expand source code
@handle_errors
def debug_api(self, port=None):
    self.stub.debug_api(connpy_pb2.IntRequest(value=port or 8048))
def get_api_status(self)
Expand source code
@handle_errors
def get_api_status(self):
    return self.stub.get_api_status(Empty()).value
def restart_api(self, port=None)
Expand source code
@handle_errors
def restart_api(self, port=None):
    self.stub.restart_api(connpy_pb2.IntRequest(value=port or 8048))
def start_api(self, port=None)
Expand source code
@handle_errors
def start_api(self, port=None):
    self.stub.start_api(connpy_pb2.IntRequest(value=port or 8048))
def stop_api(self)
Expand source code
@handle_errors
def stop_api(self):
    self.stub.stop_api(Empty())