Module connpy.grpc_layer.stubs
Functions
def handle_errors(func)-
Expand source code
def handle_errors(func): @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except grpc.RpcError as e: # Re-raise gRPC errors as native ConnpyError to keep CLI handlers agnostic details = e.details() # Identify the host if available on the instance instance = args[0] if args else None host = getattr(instance, "remote_host", "remote host") # Make common gRPC errors more readable if "failed to connect to all addresses" in details: simplified = f"Failed to connect to remote host at {host} (Connection refused)" elif "Method not found" in details: simplified = f"Remote server at {host} is using an incompatible version" elif "Deadline Exceeded" in details: simplified = f"Request to {host} timed out" else: simplified = details raise ConnpyError(simplified) return wrapper
Classes
class AIStub (channel, remote_host)-
Expand source code
class AIStub: def __init__(self, channel, remote_host): self.stub = connpy_pb2_grpc.AIServiceStub(channel) self.remote_host = remote_host @handle_errors def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debug=False, status=None, **overrides): import queue from rich.prompt import Prompt from rich.text import Text from rich.panel import Panel from rich.markdown import Markdown req_queue = queue.Queue() initial_req = connpy_pb2.AskRequest( input_text=input_text, dryrun=dryrun, session_id=session_id or "", debug=debug, engineer_model=overrides.get("engineer_model", ""), engineer_api_key=overrides.get("engineer_api_key", ""), architect_model=overrides.get("architect_model", ""), architect_api_key=overrides.get("architect_api_key", ""), trust=overrides.get("trust", False) ) if chat_history is not None: initial_req.chat_history.CopyFrom(to_value(chat_history)) if "engineer_auth" in overrides and overrides["engineer_auth"]: initial_req.engineer_auth.CopyFrom(to_struct(overrides["engineer_auth"])) if "architect_auth" in overrides and overrides["architect_auth"]: initial_req.architect_auth.CopyFrom(to_struct(overrides["architect_auth"])) req_queue.put(initial_req) def request_generator(): while True: req = req_queue.get() if req is None: break yield req responses = self.stub.ask(request_generator()) full_content = "" header_printed = False current_responder = "engineer" final_result = {"response": "", "chat_history": []} # Background thread to pull responses from gRPC into a local queue # This prevents KeyboardInterrupt from corrupting the gRPC iterator state response_queue = queue.Queue() def pull_responses(): try: for response in responses: response_queue.put(("data", response)) except Exception as e: response_queue.put(("error", e)) finally: response_queue.put((None, None)) threading.Thread(target=pull_responses, daemon=True).start() try: while True: try: # BLOCKING GET from local queue (interruptible by signal) msg_type, response = response_queue.get() except KeyboardInterrupt: # Signal interruption to the server if status: status.update("[error]Interrupted! Closing pending tasks...") # Send the interrupt signal to the server req_queue.put(connpy_pb2.AskRequest(interrupt=True)) # CONTINUE the loop to receive remaining data and summary from the queue continue if msg_type is None: # Sentinel break if msg_type == "error": # Re-raise or handle gRPC error from background thread if isinstance(response, grpc.RpcError): raise response printer.warning(f"Stream interrupted: {response}") break if response.status_update: if response.status_update.startswith("__RESPONDER__:"): current_responder = response.status_update.split(":")[1].lower() continue if response.requires_confirmation: if status: status.stop() # Show prompt and wait for answer prompt_text = Text.from_ansi(response.status_update) ans = Prompt.ask(prompt_text) if status: status.update("[ai_status]Agent: Resuming...") status.start() req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans)) continue if status: status.update(response.status_update) continue if response.debug_message: if debug: if status: try: status.stop() except: pass printer.console.print(Text.from_ansi(response.debug_message)) if status: try: status.start() except: pass continue if response.important_message: if status: try: status.stop() except: pass printer.console.print(Text.from_ansi(response.important_message)) if status: try: status.start() except: pass continue if not response.is_final: if response.text_chunk: if not header_printed: if status: try: status.stop() except: pass from rich.console import Console as RichConsole from rich.rule import Rule from ..printer import connpy_theme, get_original_stdout, IncrementalMarkdownParser stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout()) # Print header on first chunk alias = "architect" if current_responder == "architect" else "engineer" role_label = "Network Architect" if current_responder == "architect" else "Network Engineer" stable_console.print(Rule(f"[bold {alias}]{role_label}[/bold {alias}]", style=alias)) header_printed = True # Initialize parser md_parser = IncrementalMarkdownParser(console=stable_console) full_content += response.text_chunk md_parser.feed(response.text_chunk) continue if response.is_final: if header_printed: from rich.rule import Rule md_parser.flush() if status: try: status.stop() except: pass final_result = from_struct(response.full_result) responder = final_result.get("responder", "engineer") alias = "architect" if responder == "architect" else "engineer" role_label = "Network Architect" if responder == "architect" else "Network Engineer" title = f"[bold {alias}]{role_label}[/bold {alias}]" if header_printed: from rich.console import Console as RichConsole from ..printer import connpy_theme, get_original_stdout stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout()) stable_console.print(Rule(style=alias)) break except Exception as e: # Check if it was a gRPC error that we should let handle_errors catch if isinstance(e, grpc.RpcError): raise printer.warning(f"Stream interrupted: {e}") finally: req_queue.put(None) if full_content: final_result["streamed"] = True return final_result @handle_errors def confirm(self, input_text, console=None): return self.stub.confirm(connpy_pb2.StringRequest(value=input_text)).value @handle_errors def list_sessions(self, limit=None): from .utils import from_value res = self.stub.list_sessions(Empty()) sessions = from_value(res.data) or [] if limit and len(sessions) > limit: return sessions[:limit], len(sessions) return sessions, len(sessions) @handle_errors def delete_session(self, session_id): self.stub.delete_session(connpy_pb2.StringRequest(value=session_id)) @handle_errors def configure_provider(self, provider, model=None, api_key=None, auth=None): req = connpy_pb2.ProviderRequest(provider=provider, model=model or "", api_key=api_key or "") if auth: req.auth.CopyFrom(to_struct(auth)) self.stub.configure_provider(req) @handle_errors def configure_mcp(self, name, url=None, enabled=True, auto_load_on_os=None, remove=False): req = connpy_pb2.MCPRequest( name=name, url=url or "", enabled=enabled, auto_load_on_os=auto_load_on_os or "", remove=remove ) self.stub.configure_mcp(req) @handle_errors def list_mcp_servers(self): res = self.stub.list_mcp_servers(Empty()) return from_value(res.data) or {} @handle_errors def load_session_data(self, session_id): return from_struct(self.stub.load_session_data(connpy_pb2.StringRequest(value=session_id)).data)Methods
def ask(self,
input_text,
dryrun=False,
chat_history=None,
session_id=None,
debug=False,
status=None,
**overrides)-
Expand source code
@handle_errors def ask(self, input_text, dryrun=False, chat_history=None, session_id=None, debug=False, status=None, **overrides): import queue from rich.prompt import Prompt from rich.text import Text from rich.panel import Panel from rich.markdown import Markdown req_queue = queue.Queue() initial_req = connpy_pb2.AskRequest( input_text=input_text, dryrun=dryrun, session_id=session_id or "", debug=debug, engineer_model=overrides.get("engineer_model", ""), engineer_api_key=overrides.get("engineer_api_key", ""), architect_model=overrides.get("architect_model", ""), architect_api_key=overrides.get("architect_api_key", ""), trust=overrides.get("trust", False) ) if chat_history is not None: initial_req.chat_history.CopyFrom(to_value(chat_history)) if "engineer_auth" in overrides and overrides["engineer_auth"]: initial_req.engineer_auth.CopyFrom(to_struct(overrides["engineer_auth"])) if "architect_auth" in overrides and overrides["architect_auth"]: initial_req.architect_auth.CopyFrom(to_struct(overrides["architect_auth"])) req_queue.put(initial_req) def request_generator(): while True: req = req_queue.get() if req is None: break yield req responses = self.stub.ask(request_generator()) full_content = "" header_printed = False current_responder = "engineer" final_result = {"response": "", "chat_history": []} # Background thread to pull responses from gRPC into a local queue # This prevents KeyboardInterrupt from corrupting the gRPC iterator state response_queue = queue.Queue() def pull_responses(): try: for response in responses: response_queue.put(("data", response)) except Exception as e: response_queue.put(("error", e)) finally: response_queue.put((None, None)) threading.Thread(target=pull_responses, daemon=True).start() try: while True: try: # BLOCKING GET from local queue (interruptible by signal) msg_type, response = response_queue.get() except KeyboardInterrupt: # Signal interruption to the server if status: status.update("[error]Interrupted! Closing pending tasks...") # Send the interrupt signal to the server req_queue.put(connpy_pb2.AskRequest(interrupt=True)) # CONTINUE the loop to receive remaining data and summary from the queue continue if msg_type is None: # Sentinel break if msg_type == "error": # Re-raise or handle gRPC error from background thread if isinstance(response, grpc.RpcError): raise response printer.warning(f"Stream interrupted: {response}") break if response.status_update: if response.status_update.startswith("__RESPONDER__:"): current_responder = response.status_update.split(":")[1].lower() continue if response.requires_confirmation: if status: status.stop() # Show prompt and wait for answer prompt_text = Text.from_ansi(response.status_update) ans = Prompt.ask(prompt_text) if status: status.update("[ai_status]Agent: Resuming...") status.start() req_queue.put(connpy_pb2.AskRequest(confirmation_answer=ans)) continue if status: status.update(response.status_update) continue if response.debug_message: if debug: if status: try: status.stop() except: pass printer.console.print(Text.from_ansi(response.debug_message)) if status: try: status.start() except: pass continue if response.important_message: if status: try: status.stop() except: pass printer.console.print(Text.from_ansi(response.important_message)) if status: try: status.start() except: pass continue if not response.is_final: if response.text_chunk: if not header_printed: if status: try: status.stop() except: pass from rich.console import Console as RichConsole from rich.rule import Rule from ..printer import connpy_theme, get_original_stdout, IncrementalMarkdownParser stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout()) # Print header on first chunk alias = "architect" if current_responder == "architect" else "engineer" role_label = "Network Architect" if current_responder == "architect" else "Network Engineer" stable_console.print(Rule(f"[bold {alias}]{role_label}[/bold {alias}]", style=alias)) header_printed = True # Initialize parser md_parser = IncrementalMarkdownParser(console=stable_console) full_content += response.text_chunk md_parser.feed(response.text_chunk) continue if response.is_final: if header_printed: from rich.rule import Rule md_parser.flush() if status: try: status.stop() except: pass final_result = from_struct(response.full_result) responder = final_result.get("responder", "engineer") alias = "architect" if responder == "architect" else "engineer" role_label = "Network Architect" if responder == "architect" else "Network Engineer" title = f"[bold {alias}]{role_label}[/bold {alias}]" if header_printed: from rich.console import Console as RichConsole from ..printer import connpy_theme, get_original_stdout stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout()) stable_console.print(Rule(style=alias)) break except Exception as e: # Check if it was a gRPC error that we should let handle_errors catch if isinstance(e, grpc.RpcError): raise printer.warning(f"Stream interrupted: {e}") finally: req_queue.put(None) if full_content: final_result["streamed"] = True return final_result def configure_mcp(self, name, url=None, enabled=True, auto_load_on_os=None, remove=False)-
Expand source code
@handle_errors def configure_mcp(self, name, url=None, enabled=True, auto_load_on_os=None, remove=False): req = connpy_pb2.MCPRequest( name=name, url=url or "", enabled=enabled, auto_load_on_os=auto_load_on_os or "", remove=remove ) self.stub.configure_mcp(req) def configure_provider(self, provider, model=None, api_key=None, auth=None)-
Expand source code
@handle_errors def configure_provider(self, provider, model=None, api_key=None, auth=None): req = connpy_pb2.ProviderRequest(provider=provider, model=model or "", api_key=api_key or "") if auth: req.auth.CopyFrom(to_struct(auth)) self.stub.configure_provider(req) def confirm(self, input_text, console=None)-
Expand source code
@handle_errors def confirm(self, input_text, console=None): return self.stub.confirm(connpy_pb2.StringRequest(value=input_text)).value def delete_session(self, session_id)-
Expand source code
@handle_errors def delete_session(self, session_id): self.stub.delete_session(connpy_pb2.StringRequest(value=session_id)) def list_mcp_servers(self)-
Expand source code
@handle_errors def list_mcp_servers(self): res = self.stub.list_mcp_servers(Empty()) return from_value(res.data) or {} def list_sessions(self, limit=None)-
Expand source code
@handle_errors def list_sessions(self, limit=None): from .utils import from_value res = self.stub.list_sessions(Empty()) sessions = from_value(res.data) or [] if limit and len(sessions) > limit: return sessions[:limit], len(sessions) return sessions, len(sessions) def load_session_data(self, session_id)-
Expand source code
@handle_errors def load_session_data(self, session_id): return from_struct(self.stub.load_session_data(connpy_pb2.StringRequest(value=session_id)).data)
class AuthClientInterceptor (token_provider)-
Expand source code
class AuthClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor): def __init__(self, token_provider): self.token_provider = token_provider def _add_metadata(self, client_call_details): token = self.token_provider() if not token: return client_call_details metadata = [] if client_call_details.metadata: metadata = list(client_call_details.metadata) # Check if already present to avoid duplicates if not any(k.lower() == "authorization" for k, v in metadata): metadata.append(("authorization", f"Bearer {token}")) return _ClientCallDetails( method=client_call_details.method, timeout=client_call_details.timeout, metadata=metadata, credentials=client_call_details.credentials, wait_for_ready=client_call_details.wait_for_ready, compression=client_call_details.compression, ) def intercept_unary_unary(self, continuation, client_call_details, request): new_details = self._add_metadata(client_call_details) return continuation(new_details, request) def intercept_unary_stream(self, continuation, client_call_details, request): new_details = self._add_metadata(client_call_details) return continuation(new_details, request) def intercept_stream_unary(self, continuation, client_call_details, request_iterator): new_details = self._add_metadata(client_call_details) return continuation(new_details, request_iterator) def intercept_stream_stream(self, continuation, client_call_details, request_iterator): new_details = self._add_metadata(client_call_details) return continuation(new_details, request_iterator)Affords intercepting unary-unary invocations.
Ancestors
- grpc.UnaryUnaryClientInterceptor
- grpc.UnaryStreamClientInterceptor
- grpc.StreamUnaryClientInterceptor
- grpc.StreamStreamClientInterceptor
- abc.ABC
Methods
def intercept_stream_stream(self, continuation, client_call_details, request_iterator)-
Expand source code
def intercept_stream_stream(self, continuation, client_call_details, request_iterator): new_details = self._add_metadata(client_call_details) return continuation(new_details, request_iterator)Intercepts a stream-stream invocation.
Args
continuation- A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
response_iterator = continuation(client_call_details, request_iterator)to continue with the RPC.continuationreturns an object that is both a Call for the RPC and an iterator for response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. client_call_details- A ClientCallDetails object describing the outgoing RPC.
request_iterator- An iterator that yields request values for the RPC.
Returns
An object that is both a Call for the RPC and an iterator of response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. This object should also fulfill the Future interface, though it may not.
def intercept_stream_unary(self, continuation, client_call_details, request_iterator)-
Expand source code
def intercept_stream_unary(self, continuation, client_call_details, request_iterator): new_details = self._add_metadata(client_call_details) return continuation(new_details, request_iterator)Intercepts a stream-unary invocation asynchronously.
Args
continuation- A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
response_future = continuation(client_call_details, request_iterator)to continue with the RPC.continuationreturns an object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError. client_call_details- A ClientCallDetails object describing the outgoing RPC.
request_iterator- An iterator that yields request values for the RPC.
Returns
An object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.
def intercept_unary_stream(self, continuation, client_call_details, request)-
Expand source code
def intercept_unary_stream(self, continuation, client_call_details, request): new_details = self._add_metadata(client_call_details) return continuation(new_details, request)Intercepts a unary-stream invocation.
Args
continuation- A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
response_iterator = continuation(client_call_details, request)to continue with the RPC.continuationreturns an object that is both a Call for the RPC and an iterator for response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. client_call_details- A ClientCallDetails object describing the outgoing RPC.
request- The request value for the RPC.
Returns
An object that is both a Call for the RPC and an iterator of response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. This object should also fulfill the Future interface, though it may not.
def intercept_unary_unary(self, continuation, client_call_details, request)-
Expand source code
def intercept_unary_unary(self, continuation, client_call_details, request): new_details = self._add_metadata(client_call_details) return continuation(new_details, request)Intercepts a unary-unary invocation asynchronously.
Args
continuation- A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
response_future = continuation(client_call_details, request)to continue with the RPC.continuationreturns an object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError. client_call_details- A ClientCallDetails object describing the outgoing RPC.
request- The request value for the RPC.
Returns
An object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.
class AuthStub (channel, remote_host)-
Expand source code
class AuthStub: def __init__(self, channel, remote_host): self.stub = connpy_pb2_grpc.AuthServiceStub(channel) self.remote_host = remote_host @handle_errors def login(self, username, password): req = connpy_pb2.LoginRequest(username=username, password=password) resp = self.stub.login(req) return { "token": resp.token, "username": resp.username, "expires_at": resp.expires_at } @handle_errors def change_password(self, old_password, new_password): req = connpy_pb2.ChangePasswordRequest(old_password=old_password, new_password=new_password) self.stub.change_password(req)Methods
def change_password(self, old_password, new_password)-
Expand source code
@handle_errors def change_password(self, old_password, new_password): req = connpy_pb2.ChangePasswordRequest(old_password=old_password, new_password=new_password) self.stub.change_password(req) def login(self, username, password)-
Expand source code
@handle_errors def login(self, username, password): req = connpy_pb2.LoginRequest(username=username, password=password) resp = self.stub.login(req) return { "token": resp.token, "username": resp.username, "expires_at": resp.expires_at }
class ConfigStub (channel, remote_host)-
Expand source code
class ConfigStub: def __init__(self, channel, remote_host): self.stub = connpy_pb2_grpc.ConfigServiceStub(channel) self.remote_host = remote_host @handle_errors def get_settings(self): return from_struct(self.stub.get_settings(Empty()).data) @handle_errors def update_setting(self, key, value): self.stub.update_setting(connpy_pb2.UpdateRequest(key=key, value=to_value(value))) @handle_errors def get_default_dir(self): return self.stub.get_default_dir(Empty()).value @handle_errors def set_config_folder(self, folder): self.stub.set_config_folder(connpy_pb2.StringRequest(value=folder)) @handle_errors def encrypt_password(self, password): return self.stub.encrypt_password(connpy_pb2.StringRequest(value=password)).valueMethods
def encrypt_password(self, password)-
Expand source code
@handle_errors def encrypt_password(self, password): return self.stub.encrypt_password(connpy_pb2.StringRequest(value=password)).value def get_default_dir(self)-
Expand source code
@handle_errors def get_default_dir(self): return self.stub.get_default_dir(Empty()).value def get_settings(self)-
Expand source code
@handle_errors def get_settings(self): return from_struct(self.stub.get_settings(Empty()).data) def set_config_folder(self, folder)-
Expand source code
@handle_errors def set_config_folder(self, folder): self.stub.set_config_folder(connpy_pb2.StringRequest(value=folder)) def update_setting(self, key, value)-
Expand source code
@handle_errors def update_setting(self, key, value): self.stub.update_setting(connpy_pb2.UpdateRequest(key=key, value=to_value(value)))
class ExecutionStub (channel, remote_host)-
Expand source code
class ExecutionStub: def __init__(self, channel, remote_host): self.stub = connpy_pb2_grpc.ExecutionServiceStub(channel) self.remote_host = remote_host @handle_errors def run_commands(self, nodes_filter, commands, variables=None, parallel=10, timeout=10, folder=None, prompt=None, **kwargs): nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter) req = connpy_pb2.RunRequest( nodes=nodes_list, commands=commands, folder=folder or "", prompt=prompt or "", parallel=parallel, timeout=timeout, name=kwargs.get("name", "") ) if variables is not None: req.vars.CopyFrom(to_struct(variables)) final_results = {} on_complete = kwargs.get("on_node_complete") for response in self.stub.run_commands(req): if on_complete: on_complete(response.unique_id, response.output, response.status) final_results[response.unique_id] = { "output": response.output, "status": response.status } return final_results @handle_errors def test_commands(self, nodes_filter, commands, expected, variables=None, parallel=10, timeout=10, prompt=None, **kwargs): nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter) req = connpy_pb2.TestRequest( nodes=nodes_list, commands=commands, expected=expected if isinstance(expected, list) else [expected], folder=kwargs.get("folder", ""), prompt=prompt or "", parallel=parallel, timeout=timeout, name=kwargs.get("name", "") ) if variables is not None: req.vars.CopyFrom(to_struct(variables)) final_results = {} on_complete = kwargs.get("on_node_complete") for response in self.stub.test_commands(req): result_dict = from_struct(response.test_result) if response.HasField("test_result") else {} if on_complete: on_complete(response.unique_id, response.output, response.status, result_dict) final_results[response.unique_id] = result_dict return final_results @handle_errors def run_cli_script(self, nodes_filter, script_path, parallel=10): req = connpy_pb2.ScriptRequest(param1=nodes_filter, param2=script_path, parallel=parallel) return from_struct(self.stub.run_cli_script(req).data) @handle_errors def run_yaml_playbook(self, playbook_path, parallel=10): req = connpy_pb2.ScriptRequest(param1=playbook_path, parallel=parallel) return from_struct(self.stub.run_yaml_playbook(req).data)Methods
def run_cli_script(self, nodes_filter, script_path, parallel=10)-
Expand source code
@handle_errors def run_cli_script(self, nodes_filter, script_path, parallel=10): req = connpy_pb2.ScriptRequest(param1=nodes_filter, param2=script_path, parallel=parallel) return from_struct(self.stub.run_cli_script(req).data) def run_commands(self,
nodes_filter,
commands,
variables=None,
parallel=10,
timeout=10,
folder=None,
prompt=None,
**kwargs)-
Expand source code
@handle_errors def run_commands(self, nodes_filter, commands, variables=None, parallel=10, timeout=10, folder=None, prompt=None, **kwargs): nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter) req = connpy_pb2.RunRequest( nodes=nodes_list, commands=commands, folder=folder or "", prompt=prompt or "", parallel=parallel, timeout=timeout, name=kwargs.get("name", "") ) if variables is not None: req.vars.CopyFrom(to_struct(variables)) final_results = {} on_complete = kwargs.get("on_node_complete") for response in self.stub.run_commands(req): if on_complete: on_complete(response.unique_id, response.output, response.status) final_results[response.unique_id] = { "output": response.output, "status": response.status } return final_results def run_yaml_playbook(self, playbook_path, parallel=10)-
Expand source code
@handle_errors def run_yaml_playbook(self, playbook_path, parallel=10): req = connpy_pb2.ScriptRequest(param1=playbook_path, parallel=parallel) return from_struct(self.stub.run_yaml_playbook(req).data) def test_commands(self,
nodes_filter,
commands,
expected,
variables=None,
parallel=10,
timeout=10,
prompt=None,
**kwargs)-
Expand source code
@handle_errors def test_commands(self, nodes_filter, commands, expected, variables=None, parallel=10, timeout=10, prompt=None, **kwargs): nodes_list = [nodes_filter] if isinstance(nodes_filter, str) else list(nodes_filter) req = connpy_pb2.TestRequest( nodes=nodes_list, commands=commands, expected=expected if isinstance(expected, list) else [expected], folder=kwargs.get("folder", ""), prompt=prompt or "", parallel=parallel, timeout=timeout, name=kwargs.get("name", "") ) if variables is not None: req.vars.CopyFrom(to_struct(variables)) final_results = {} on_complete = kwargs.get("on_node_complete") for response in self.stub.test_commands(req): result_dict = from_struct(response.test_result) if response.HasField("test_result") else {} if on_complete: on_complete(response.unique_id, response.output, response.status, result_dict) final_results[response.unique_id] = result_dict return final_results
class ImportExportStub (channel, remote_host)-
Expand source code
class ImportExportStub: def __init__(self, channel, remote_host): self.stub = connpy_pb2_grpc.ImportExportServiceStub(channel) self.remote_host = remote_host @handle_errors def export_to_file(self, file_path, folders=None): req = connpy_pb2.ExportRequest(file_path=file_path, folders=folders or []) self.stub.export_to_file(req) @handle_errors def import_from_file(self, file_path): with open(file_path, "r") as f: content = f.read() # Marker to tell the server this is content, not a path marker_content = f"---YAML---\n{content}" self.stub.import_from_file(connpy_pb2.StringRequest(value=marker_content)) @handle_errors def set_reserved_names(self, names): self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))Methods
def export_to_file(self, file_path, folders=None)-
Expand source code
@handle_errors def export_to_file(self, file_path, folders=None): req = connpy_pb2.ExportRequest(file_path=file_path, folders=folders or []) self.stub.export_to_file(req) def import_from_file(self, file_path)-
Expand source code
@handle_errors def import_from_file(self, file_path): with open(file_path, "r") as f: content = f.read() # Marker to tell the server this is content, not a path marker_content = f"---YAML---\n{content}" self.stub.import_from_file(connpy_pb2.StringRequest(value=marker_content)) def set_reserved_names(self, names)-
Expand source code
@handle_errors def set_reserved_names(self, names): self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names))
class NodeStub (channel, remote_host, config=None)-
Expand source code
class NodeStub: def __init__(self, channel, remote_host, config=None): self.stub = connpy_pb2_grpc.NodeServiceStub(channel) self.remote_host = remote_host self.config = config def _handle_remote_copilot(self, res, request_queue, response_queue, client_buffer_bytes, pause_generator, resume_generator, old_tty): import json, asyncio, termios, sys, tty, queue from ..core import copilot_terminal_mode from . import connpy_pb2 pause_generator() termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) node_info = json.loads(res.copilot_node_info_json) if res.copilot_node_info_json else {} blocks = node_info.get("context_blocks", []) interface = CopilotInterface( self.config, history=getattr(self, 'copilot_history', None), session_state=getattr(self, 'copilot_state', None) ) self.copilot_history = interface.history self.copilot_state = interface.session_state async def on_ai_call_remote(active_buffer, question, chunk_callback, merged_node_info): # Send request to server request_queue.put(connpy_pb2.InteractRequest( copilot_question=question, copilot_context_buffer=active_buffer, copilot_node_info_json=json.dumps(merged_node_info) )) # Wait for chunks from server while True: try: chunk_res = response_queue.get(timeout=0.1) if chunk_res is None: return {"error": "Server disconnected"} if chunk_res.copilot_stream_chunk: chunk_callback(chunk_res.copilot_stream_chunk) elif chunk_res.copilot_response_json: return json.loads(chunk_res.copilot_response_json) except queue.Empty: await asyncio.sleep(0.05) # Wrap in async loop async def run_remote_copilot(): while True: action, commands, custom_cmd = await interface.run_session( raw_bytes=bytes(client_buffer_bytes), node_info=node_info, on_ai_call=on_ai_call_remote, blocks=blocks ) if action == "continue": # Send continue signal to server to loop back for another question request_queue.put(connpy_pb2.InteractRequest(copilot_action="continue")) continue return action, commands, custom_cmd with copilot_terminal_mode(): action, commands, custom_cmd = asyncio.run(run_remote_copilot()) print("\033[2m Returning to session...\033[0m", flush=True) # Prepare final action for server action_sent = "cancel" if action == "send_all" and commands: # In remote mode, send the selected commands as a custom block # so the server executes exactly what the user picked (e.g., selection '1') action_sent = f"custom:{chr(10).join(commands)}" elif action == "custom" and custom_cmd: action_sent = f"custom:{chr(10).join(custom_cmd)}" request_queue.put(connpy_pb2.InteractRequest(copilot_action=action_sent)) resume_generator() tty.setraw(sys.stdin.fileno()) @handle_errors def connect_node(self, unique_id, sftp=False, debug=False, logger=None): import sys import select import tty import termios import queue import os import threading request_queue = queue.Queue() client_buffer_bytes = bytearray() pause_stdin = [False] wake_r, wake_w = os.pipe() def pause_generator(): pause_stdin[0] = True os.write(wake_w, b'\x00') def resume_generator(): pause_stdin[0] = False def request_generator(): cols, rows = 80, 24 try: size = os.get_terminal_size() cols, rows = size.columns, size.lines except OSError: pass yield connpy_pb2.InteractRequest( id=unique_id, sftp=sftp, debug=debug, cols=cols, rows=rows ) while True: try: while True: req = request_queue.get_nowait() if req is None: return yield req except queue.Empty: pass if pause_stdin[0]: import time time.sleep(0.05) continue r, _, _ = select.select([sys.stdin.fileno(), wake_r], [], [], 0.05) if wake_r in r: os.read(wake_r, 1) continue if sys.stdin.fileno() in r and not pause_stdin[0]: try: data = os.read(sys.stdin.fileno(), 1024) if not data: break yield connpy_pb2.InteractRequest(stdin_data=data) except OSError: break # Fetch node details for the connection message try: node_details = self.get_node_details(unique_id) host = node_details.get("host", "unknown") port = str(node_details.get("port", "")) protocol = "sftp" if sftp else node_details.get("protocol", "ssh") port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else "" conn_msg = f"Connected to {unique_id} at {host}{port_str} via: {protocol}" except Exception: conn_msg = f"Connected to {unique_id}" old_tty = termios.tcgetattr(sys.stdin) try: import time tty.setraw(sys.stdin.fileno()) response_iterator = self.stub.interact_node(request_generator()) import queue response_queue = queue.Queue() def response_consumer(): try: for r in response_iterator: response_queue.put(r) except Exception: pass response_queue.put(None) t_consumer = threading.Thread(target=response_consumer, daemon=True) t_consumer.start() # First phase: Wait for connection status, print early data try: while True: res = response_queue.get() if res is None: return if res.stdout_data: data = res.stdout_data if debug: data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'') os.write(sys.stdout.fileno(), data) if res.success: # Connection established on server, show success message termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) printer.success(conn_msg) pause_stdin[0] = False tty.setraw(sys.stdin.fileno()) break if res.error_message: # Connection failed on server termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) printer.error(f"Connection failed: {res.error_message}") return except queue.Empty: return # Second phase: Stream active session # Clear screen filter is only applied before success (Phase 1). # Once the user has a prompt, Ctrl+L must work normally. while True: res = response_queue.get() if res is None: break if res.copilot_prompt: self._handle_remote_copilot( res, request_queue, response_queue, client_buffer_bytes, pause_generator, resume_generator, old_tty ) continue if res.stdout_data: os.write(sys.stdout.fileno(), res.stdout_data) client_buffer_bytes.extend(res.stdout_data) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) os.close(wake_r) os.close(wake_w) @handle_errors def connect_dynamic(self, connection_params, debug=False): import sys import select import tty import termios import queue import os import json params_json = json.dumps(connection_params) request_queue = queue.Queue() client_buffer_bytes = bytearray() pause_stdin = [False] wake_r, wake_w = os.pipe() def pause_generator(): pause_stdin[0] = True os.write(wake_w, b'\x00') def resume_generator(): pause_stdin[0] = False def request_generator(): cols, rows = 80, 24 try: size = os.get_terminal_size() cols, rows = size.columns, size.lines except OSError: pass yield connpy_pb2.InteractRequest( id="dynamic", debug=debug, cols=cols, rows=rows, connection_params_json=params_json ) while True: try: while True: req = request_queue.get_nowait() if req is None: return yield req except queue.Empty: pass if pause_stdin[0]: import time time.sleep(0.05) continue r, _, _ = select.select([sys.stdin.fileno(), wake_r], [], [], 0.05) if wake_r in r: os.read(wake_r, 1) continue if sys.stdin.fileno() in r and not pause_stdin[0]: try: data = os.read(sys.stdin.fileno(), 1024) if not data: break yield connpy_pb2.InteractRequest(stdin_data=data) except OSError: break # Prepare connection message try: node_name = connection_params.get("name", "dynamic@remote") host = connection_params.get("host", "dynamic") port = str(connection_params.get("port", "")) protocol = connection_params.get("protocol", "ssh") port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else "" conn_msg = f"Connected to {node_name} at {host}{port_str} via: {protocol}" except Exception: node_name = connection_params.get("name", "dynamic@remote") if isinstance(connection_params, dict) else "dynamic@remote" conn_msg = f"Connected to {node_name}" old_tty = termios.tcgetattr(sys.stdin) try: import time tty.setraw(sys.stdin.fileno()) response_iterator = self.stub.interact_node(request_generator()) import queue response_queue = queue.Queue() def response_consumer(): try: for r in response_iterator: response_queue.put(r) except Exception: pass response_queue.put(None) t_consumer = threading.Thread(target=response_consumer, daemon=True) t_consumer.start() # First phase: Wait for connection status, print early data try: while True: res = response_queue.get() if res is None: return if res.stdout_data: data = res.stdout_data if debug: data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'') os.write(sys.stdout.fileno(), data) if res.success: # Connection established on server, show success message termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) printer.success(conn_msg) pause_stdin[0] = False tty.setraw(sys.stdin.fileno()) break if res.error_message: # Connection failed on server termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) printer.error(f"Connection failed: {res.error_message}") return except queue.Empty: return # Second phase: Stream active session while True: res = response_queue.get() if res is None: break if res.copilot_prompt: self._handle_remote_copilot( res, request_queue, response_queue, client_buffer_bytes, pause_generator, resume_generator, old_tty ) continue if res.stdout_data: os.write(sys.stdout.fileno(), res.stdout_data) client_buffer_bytes.extend(res.stdout_data) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) os.close(wake_r) os.close(wake_w) @MethodHook @handle_errors def list_nodes(self, filter_str=None, format_str=None): req = connpy_pb2.FilterRequest(filter_str=filter_str or "", format_str=format_str or "") return from_value(self.stub.list_nodes(req).data) or [] @MethodHook @handle_errors def list_folders(self, filter_str=None): req = connpy_pb2.FilterRequest(filter_str=filter_str or "") return from_value(self.stub.list_folders(req).data) or [] @handle_errors def get_node_details(self, unique_id): return from_struct(self.stub.get_node_details(connpy_pb2.IdRequest(id=unique_id)).data) @handle_errors def explode_unique(self, unique_id): return from_value(self.stub.explode_unique(connpy_pb2.IdRequest(id=unique_id)).data) @handle_errors def validate_parent_folder(self, unique_id): self.stub.validate_parent_folder(connpy_pb2.IdRequest(id=unique_id)) @handle_errors def generate_cache(self, nodes=None, folders=None, profiles=None): # 1. Update remote cache on server self.stub.generate_cache(Empty()) # 2. Update local fzf/text cache files # If no data provided, we fetch it all from remote to sync local files if nodes is None and folders is None and profiles is None: nodes = self.list_nodes() folders = self.list_folders() # We don't have direct access to ProfileStub here, but usually # node cache is what matters for fzf. We'll fetch profiles if we can. # For now, let's sync what we have. if nodes is not None or folders is not None or profiles is not None: self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles) def _trigger_local_cache_sync(self): """Helper to fetch remote data and update local fzf cache files after a change.""" try: nodes = self.list_nodes() folders = self.list_folders() self.generate_cache(nodes=nodes, folders=folders) except Exception: # Failure to sync cache shouldn't break the main operation's success feedback pass @handle_errors def add_node(self, unique_id, data, is_folder=False): req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=is_folder) self.stub.add_node(req) self._trigger_local_cache_sync() @handle_errors def update_node(self, unique_id, data, save=True): req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=False) self.stub.update_node(req) if save: self._trigger_local_cache_sync() @handle_errors def delete_node(self, unique_id, is_folder=False, save=True): req = connpy_pb2.DeleteRequest(id=unique_id, is_folder=is_folder) self.stub.delete_node(req) if save: self._trigger_local_cache_sync() @handle_errors def move_node(self, src_id, dst_id, copy=False): req = connpy_pb2.MoveRequest(src_id=src_id, dst_id=dst_id, copy=copy) self.stub.move_node(req) self._trigger_local_cache_sync() @handle_errors def bulk_add(self, ids, hosts, common_data): req = connpy_pb2.BulkRequest(ids=ids, hosts=hosts, common_data=to_struct(common_data)) self.stub.bulk_add(req) self._trigger_local_cache_sync() @handle_errors def set_reserved_names(self, names): self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names)) self._trigger_local_cache_sync() @handle_errors def full_replace(self, connections, profiles): req = connpy_pb2.FullReplaceRequest( connections=to_struct(connections), profiles=to_struct(profiles) ) self.stub.full_replace(req) self._trigger_local_cache_sync() @handle_errors def get_inventory(self): resp = self.stub.get_inventory(Empty()) return { "connections": from_struct(resp.connections), "profiles": from_struct(resp.profiles) }Methods
def add_node(self, unique_id, data, is_folder=False)-
Expand source code
@handle_errors def add_node(self, unique_id, data, is_folder=False): req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=is_folder) self.stub.add_node(req) self._trigger_local_cache_sync() def bulk_add(self, ids, hosts, common_data)-
Expand source code
@handle_errors def bulk_add(self, ids, hosts, common_data): req = connpy_pb2.BulkRequest(ids=ids, hosts=hosts, common_data=to_struct(common_data)) self.stub.bulk_add(req) self._trigger_local_cache_sync() def connect_dynamic(self, connection_params, debug=False)-
Expand source code
@handle_errors def connect_dynamic(self, connection_params, debug=False): import sys import select import tty import termios import queue import os import json params_json = json.dumps(connection_params) request_queue = queue.Queue() client_buffer_bytes = bytearray() pause_stdin = [False] wake_r, wake_w = os.pipe() def pause_generator(): pause_stdin[0] = True os.write(wake_w, b'\x00') def resume_generator(): pause_stdin[0] = False def request_generator(): cols, rows = 80, 24 try: size = os.get_terminal_size() cols, rows = size.columns, size.lines except OSError: pass yield connpy_pb2.InteractRequest( id="dynamic", debug=debug, cols=cols, rows=rows, connection_params_json=params_json ) while True: try: while True: req = request_queue.get_nowait() if req is None: return yield req except queue.Empty: pass if pause_stdin[0]: import time time.sleep(0.05) continue r, _, _ = select.select([sys.stdin.fileno(), wake_r], [], [], 0.05) if wake_r in r: os.read(wake_r, 1) continue if sys.stdin.fileno() in r and not pause_stdin[0]: try: data = os.read(sys.stdin.fileno(), 1024) if not data: break yield connpy_pb2.InteractRequest(stdin_data=data) except OSError: break # Prepare connection message try: node_name = connection_params.get("name", "dynamic@remote") host = connection_params.get("host", "dynamic") port = str(connection_params.get("port", "")) protocol = connection_params.get("protocol", "ssh") port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else "" conn_msg = f"Connected to {node_name} at {host}{port_str} via: {protocol}" except Exception: node_name = connection_params.get("name", "dynamic@remote") if isinstance(connection_params, dict) else "dynamic@remote" conn_msg = f"Connected to {node_name}" old_tty = termios.tcgetattr(sys.stdin) try: import time tty.setraw(sys.stdin.fileno()) response_iterator = self.stub.interact_node(request_generator()) import queue response_queue = queue.Queue() def response_consumer(): try: for r in response_iterator: response_queue.put(r) except Exception: pass response_queue.put(None) t_consumer = threading.Thread(target=response_consumer, daemon=True) t_consumer.start() # First phase: Wait for connection status, print early data try: while True: res = response_queue.get() if res is None: return if res.stdout_data: data = res.stdout_data if debug: data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'') os.write(sys.stdout.fileno(), data) if res.success: # Connection established on server, show success message termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) printer.success(conn_msg) pause_stdin[0] = False tty.setraw(sys.stdin.fileno()) break if res.error_message: # Connection failed on server termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) printer.error(f"Connection failed: {res.error_message}") return except queue.Empty: return # Second phase: Stream active session while True: res = response_queue.get() if res is None: break if res.copilot_prompt: self._handle_remote_copilot( res, request_queue, response_queue, client_buffer_bytes, pause_generator, resume_generator, old_tty ) continue if res.stdout_data: os.write(sys.stdout.fileno(), res.stdout_data) client_buffer_bytes.extend(res.stdout_data) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) os.close(wake_r) os.close(wake_w) def connect_node(self, unique_id, sftp=False, debug=False, logger=None)-
Expand source code
@handle_errors def connect_node(self, unique_id, sftp=False, debug=False, logger=None): import sys import select import tty import termios import queue import os import threading request_queue = queue.Queue() client_buffer_bytes = bytearray() pause_stdin = [False] wake_r, wake_w = os.pipe() def pause_generator(): pause_stdin[0] = True os.write(wake_w, b'\x00') def resume_generator(): pause_stdin[0] = False def request_generator(): cols, rows = 80, 24 try: size = os.get_terminal_size() cols, rows = size.columns, size.lines except OSError: pass yield connpy_pb2.InteractRequest( id=unique_id, sftp=sftp, debug=debug, cols=cols, rows=rows ) while True: try: while True: req = request_queue.get_nowait() if req is None: return yield req except queue.Empty: pass if pause_stdin[0]: import time time.sleep(0.05) continue r, _, _ = select.select([sys.stdin.fileno(), wake_r], [], [], 0.05) if wake_r in r: os.read(wake_r, 1) continue if sys.stdin.fileno() in r and not pause_stdin[0]: try: data = os.read(sys.stdin.fileno(), 1024) if not data: break yield connpy_pb2.InteractRequest(stdin_data=data) except OSError: break # Fetch node details for the connection message try: node_details = self.get_node_details(unique_id) host = node_details.get("host", "unknown") port = str(node_details.get("port", "")) protocol = "sftp" if sftp else node_details.get("protocol", "ssh") port_str = f":{port}" if port and protocol not in ["ssm", "kubectl", "docker"] else "" conn_msg = f"Connected to {unique_id} at {host}{port_str} via: {protocol}" except Exception: conn_msg = f"Connected to {unique_id}" old_tty = termios.tcgetattr(sys.stdin) try: import time tty.setraw(sys.stdin.fileno()) response_iterator = self.stub.interact_node(request_generator()) import queue response_queue = queue.Queue() def response_consumer(): try: for r in response_iterator: response_queue.put(r) except Exception: pass response_queue.put(None) t_consumer = threading.Thread(target=response_consumer, daemon=True) t_consumer.start() # First phase: Wait for connection status, print early data try: while True: res = response_queue.get() if res is None: return if res.stdout_data: data = res.stdout_data if debug: data = data.replace(b'\x1b[H\x1b[2J', b'').replace(b'\x1bc', b'').replace(b'\x1b[3J', b'') os.write(sys.stdout.fileno(), data) if res.success: # Connection established on server, show success message termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) printer.success(conn_msg) pause_stdin[0] = False tty.setraw(sys.stdin.fileno()) break if res.error_message: # Connection failed on server termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) printer.error(f"Connection failed: {res.error_message}") return except queue.Empty: return # Second phase: Stream active session # Clear screen filter is only applied before success (Phase 1). # Once the user has a prompt, Ctrl+L must work normally. while True: res = response_queue.get() if res is None: break if res.copilot_prompt: self._handle_remote_copilot( res, request_queue, response_queue, client_buffer_bytes, pause_generator, resume_generator, old_tty ) continue if res.stdout_data: os.write(sys.stdout.fileno(), res.stdout_data) client_buffer_bytes.extend(res.stdout_data) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) os.close(wake_r) os.close(wake_w) def delete_node(self, unique_id, is_folder=False, save=True)-
Expand source code
@handle_errors def delete_node(self, unique_id, is_folder=False, save=True): req = connpy_pb2.DeleteRequest(id=unique_id, is_folder=is_folder) self.stub.delete_node(req) if save: self._trigger_local_cache_sync() def explode_unique(self, unique_id)-
Expand source code
@handle_errors def explode_unique(self, unique_id): return from_value(self.stub.explode_unique(connpy_pb2.IdRequest(id=unique_id)).data) def full_replace(self, connections, profiles)-
Expand source code
@handle_errors def full_replace(self, connections, profiles): req = connpy_pb2.FullReplaceRequest( connections=to_struct(connections), profiles=to_struct(profiles) ) self.stub.full_replace(req) self._trigger_local_cache_sync() def generate_cache(self, nodes=None, folders=None, profiles=None)-
Expand source code
@handle_errors def generate_cache(self, nodes=None, folders=None, profiles=None): # 1. Update remote cache on server self.stub.generate_cache(Empty()) # 2. Update local fzf/text cache files # If no data provided, we fetch it all from remote to sync local files if nodes is None and folders is None and profiles is None: nodes = self.list_nodes() folders = self.list_folders() # We don't have direct access to ProfileStub here, but usually # node cache is what matters for fzf. We'll fetch profiles if we can. # For now, let's sync what we have. if nodes is not None or folders is not None or profiles is not None: self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles) def get_inventory(self)-
Expand source code
@handle_errors def get_inventory(self): resp = self.stub.get_inventory(Empty()) return { "connections": from_struct(resp.connections), "profiles": from_struct(resp.profiles) } def get_node_details(self, unique_id)-
Expand source code
@handle_errors def get_node_details(self, unique_id): return from_struct(self.stub.get_node_details(connpy_pb2.IdRequest(id=unique_id)).data) def list_folders(self, filter_str=None)-
Expand source code
@MethodHook @handle_errors def list_folders(self, filter_str=None): req = connpy_pb2.FilterRequest(filter_str=filter_str or "") return from_value(self.stub.list_folders(req).data) or [] def list_nodes(self, filter_str=None, format_str=None)-
Expand source code
@MethodHook @handle_errors def list_nodes(self, filter_str=None, format_str=None): req = connpy_pb2.FilterRequest(filter_str=filter_str or "", format_str=format_str or "") return from_value(self.stub.list_nodes(req).data) or [] def move_node(self, src_id, dst_id, copy=False)-
Expand source code
@handle_errors def move_node(self, src_id, dst_id, copy=False): req = connpy_pb2.MoveRequest(src_id=src_id, dst_id=dst_id, copy=copy) self.stub.move_node(req) self._trigger_local_cache_sync() def set_reserved_names(self, names)-
Expand source code
@handle_errors def set_reserved_names(self, names): self.stub.set_reserved_names(connpy_pb2.ListRequest(items=names)) self._trigger_local_cache_sync() def update_node(self, unique_id, data, save=True)-
Expand source code
@handle_errors def update_node(self, unique_id, data, save=True): req = connpy_pb2.NodeRequest(id=unique_id, data=to_struct(data), is_folder=False) self.stub.update_node(req) if save: self._trigger_local_cache_sync() def validate_parent_folder(self, unique_id)-
Expand source code
@handle_errors def validate_parent_folder(self, unique_id): self.stub.validate_parent_folder(connpy_pb2.IdRequest(id=unique_id))
class PluginStub (channel, remote_host)-
Expand source code
class PluginStub: def __init__(self, channel, remote_host): self.stub = connpy_pb2_grpc.PluginServiceStub(channel) self.remote_stub = remote_plugin_pb2_grpc.RemotePluginServiceStub(channel) self.remote_host = remote_host @handle_errors def list_plugins(self): return from_value(self.stub.list_plugins(Empty()).data) @handle_errors def add_plugin(self, name, source_file, update=False): # Read the local file content to send it to the server with open(source_file, "r") as f: content = f.read() # Use source_file as a marker for "content-inside" marker_content = f"---CONTENT---\n{content}" req = connpy_pb2.PluginRequest(name=name, source_file=marker_content, update=update) self.stub.add_plugin(req) @handle_errors def delete_plugin(self, name): self.stub.delete_plugin(connpy_pb2.IdRequest(id=name)) @handle_errors def enable_plugin(self, name): self.stub.enable_plugin(connpy_pb2.IdRequest(id=name)) @handle_errors def disable_plugin(self, name): self.stub.disable_plugin(connpy_pb2.IdRequest(id=name)) @handle_errors def get_plugin_source(self, name): resp = self.remote_stub.get_plugin_source(remote_plugin_pb2.IdRequest(id=name)) return resp.value @handle_errors def invoke_plugin(self, name, args_namespace): import json args_dict = {k: v for k, v in vars(args_namespace).items() if isinstance(v, (str, int, float, bool, list, type(None)))} if hasattr(args_namespace, "func") and hasattr(args_namespace.func, "__name__"): args_dict["__func_name__"] = args_namespace.func.__name__ req = remote_plugin_pb2.PluginInvokeRequest(name=name, args_json=json.dumps(args_dict)) for chunk in self.remote_stub.invoke_plugin(req): yield chunk.textMethods
def add_plugin(self, name, source_file, update=False)-
Expand source code
@handle_errors def add_plugin(self, name, source_file, update=False): # Read the local file content to send it to the server with open(source_file, "r") as f: content = f.read() # Use source_file as a marker for "content-inside" marker_content = f"---CONTENT---\n{content}" req = connpy_pb2.PluginRequest(name=name, source_file=marker_content, update=update) self.stub.add_plugin(req) def delete_plugin(self, name)-
Expand source code
@handle_errors def delete_plugin(self, name): self.stub.delete_plugin(connpy_pb2.IdRequest(id=name)) def disable_plugin(self, name)-
Expand source code
@handle_errors def disable_plugin(self, name): self.stub.disable_plugin(connpy_pb2.IdRequest(id=name)) def enable_plugin(self, name)-
Expand source code
@handle_errors def enable_plugin(self, name): self.stub.enable_plugin(connpy_pb2.IdRequest(id=name)) def get_plugin_source(self, name)-
Expand source code
@handle_errors def get_plugin_source(self, name): resp = self.remote_stub.get_plugin_source(remote_plugin_pb2.IdRequest(id=name)) return resp.value def invoke_plugin(self, name, args_namespace)-
Expand source code
@handle_errors def invoke_plugin(self, name, args_namespace): import json args_dict = {k: v for k, v in vars(args_namespace).items() if isinstance(v, (str, int, float, bool, list, type(None)))} if hasattr(args_namespace, "func") and hasattr(args_namespace.func, "__name__"): args_dict["__func_name__"] = args_namespace.func.__name__ req = remote_plugin_pb2.PluginInvokeRequest(name=name, args_json=json.dumps(args_dict)) for chunk in self.remote_stub.invoke_plugin(req): yield chunk.text def list_plugins(self)-
Expand source code
@handle_errors def list_plugins(self): return from_value(self.stub.list_plugins(Empty()).data)
class ProfileStub (channel, remote_host, node_stub=None)-
Expand source code
class ProfileStub: def __init__(self, channel, remote_host, node_stub=None): self.stub = connpy_pb2_grpc.ProfileServiceStub(channel) self.remote_host = remote_host self.node_stub = node_stub @handle_errors def list_profiles(self, filter_str=None): req = connpy_pb2.FilterRequest(filter_str=filter_str or "") return from_value(self.stub.list_profiles(req).data) or [] @handle_errors def get_profile(self, name, resolve=True): req = connpy_pb2.ProfileRequest(name=name, resolve=resolve) return from_struct(self.stub.get_profile(req).data) @handle_errors def add_profile(self, name, data): req = connpy_pb2.NodeRequest(id=name, data=to_struct(data)) self.stub.add_profile(req) if self.node_stub: self.node_stub._trigger_local_cache_sync() @handle_errors def resolve_node_data(self, node_data): req = connpy_pb2.StructRequest(data=to_struct(node_data)) return from_struct(self.stub.resolve_node_data(req).data) @handle_errors def delete_profile(self, name): req = connpy_pb2.IdRequest(id=name) self.stub.delete_profile(req) if self.node_stub: self.node_stub._trigger_local_cache_sync() @handle_errors def update_profile(self, name, data): req = connpy_pb2.NodeRequest(id=name, data=to_struct(data)) self.stub.update_profile(req) if self.node_stub: self.node_stub._trigger_local_cache_sync()Methods
def add_profile(self, name, data)-
Expand source code
@handle_errors def add_profile(self, name, data): req = connpy_pb2.NodeRequest(id=name, data=to_struct(data)) self.stub.add_profile(req) if self.node_stub: self.node_stub._trigger_local_cache_sync() def delete_profile(self, name)-
Expand source code
@handle_errors def delete_profile(self, name): req = connpy_pb2.IdRequest(id=name) self.stub.delete_profile(req) if self.node_stub: self.node_stub._trigger_local_cache_sync() def get_profile(self, name, resolve=True)-
Expand source code
@handle_errors def get_profile(self, name, resolve=True): req = connpy_pb2.ProfileRequest(name=name, resolve=resolve) return from_struct(self.stub.get_profile(req).data) def list_profiles(self, filter_str=None)-
Expand source code
@handle_errors def list_profiles(self, filter_str=None): req = connpy_pb2.FilterRequest(filter_str=filter_str or "") return from_value(self.stub.list_profiles(req).data) or [] def resolve_node_data(self, node_data)-
Expand source code
@handle_errors def resolve_node_data(self, node_data): req = connpy_pb2.StructRequest(data=to_struct(node_data)) return from_struct(self.stub.resolve_node_data(req).data) def update_profile(self, name, data)-
Expand source code
@handle_errors def update_profile(self, name, data): req = connpy_pb2.NodeRequest(id=name, data=to_struct(data)) self.stub.update_profile(req) if self.node_stub: self.node_stub._trigger_local_cache_sync()
class SystemStub (channel, remote_host)-
Expand source code
class SystemStub: def __init__(self, channel, remote_host): self.stub = connpy_pb2_grpc.SystemServiceStub(channel) self.remote_host = remote_host @handle_errors def start_api(self, port=None): self.stub.start_api(connpy_pb2.IntRequest(value=port or 8048)) @handle_errors def debug_api(self, port=None): self.stub.debug_api(connpy_pb2.IntRequest(value=port or 8048)) @handle_errors def stop_api(self): self.stub.stop_api(Empty()) @handle_errors def restart_api(self, port=None): self.stub.restart_api(connpy_pb2.IntRequest(value=port or 8048)) @handle_errors def get_api_status(self): return self.stub.get_api_status(Empty()).valueMethods
def debug_api(self, port=None)-
Expand source code
@handle_errors def debug_api(self, port=None): self.stub.debug_api(connpy_pb2.IntRequest(value=port or 8048)) def get_api_status(self)-
Expand source code
@handle_errors def get_api_status(self): return self.stub.get_api_status(Empty()).value def restart_api(self, port=None)-
Expand source code
@handle_errors def restart_api(self, port=None): self.stub.restart_api(connpy_pb2.IntRequest(value=port or 8048)) def start_api(self, port=None)-
Expand source code
@handle_errors def start_api(self, port=None): self.stub.start_api(connpy_pb2.IntRequest(value=port or 8048)) def stop_api(self)-
Expand source code
@handle_errors def stop_api(self): self.stub.stop_api(Empty())