Module connpy.services
Sub-modules
connpy.services.ai_serviceconnpy.services.baseconnpy.services.config_serviceconnpy.services.context_serviceconnpy.services.exceptionsconnpy.services.execution_serviceconnpy.services.import_export_serviceconnpy.services.node_serviceconnpy.services.plugin_serviceconnpy.services.profile_serviceconnpy.services.providerconnpy.services.sync_serviceconnpy.services.system_serviceconnpy.services.user_service
Classes
class AIService (config=None)-
Expand source code
class AIService(BaseService): """Business logic for interacting with AI agents and LLM configurations.""" def _clean_cisco_scrolling(self, text: str) -> str: """Resolves horizontal scrolling artifacts (backspaces, \r, ANSI) by merging overlapping segments.""" def merge_overlapping(s1, s2): s2_clean = s2.lstrip(' $') max_overlap = min(len(s1), len(s2_clean)) for i in range(max_overlap, 0, -1): if s1[-i:] == s2_clean[:i]: return s1 + s2_clean[i:] return s1 + s2_clean scroll_re = re.compile(r'(\x08{5,}\s*\$?|\$\r|\x1b\[\d+[GD]\s*\$?)') parts = scroll_re.split(text) merged = "" for part in parts: if scroll_re.match(part): continue cleaned = log_cleaner(part) if not merged: merged = cleaned else: merged_lines = merged.split('\n') cleaned_lines = cleaned.split('\n') merged_lines[-1] = merge_overlapping(merged_lines[-1], cleaned_lines[0]) merged_lines.extend(cleaned_lines[1:]) merged = "\n".join(merged_lines) return merged def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list: """Identifies command blocks in the terminal history.""" blocks = [] if not raw_bytes: return blocks default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$' device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt) try: prompt_re = re.compile(prompt_re_str) except Exception: prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt)) parsed_positions = [] if cmd_byte_positions and len(cmd_byte_positions) >= 1: for i in range(1, len(cmd_byte_positions)): pos, known_cmd = cmd_byte_positions[i] prev_pos = cmd_byte_positions[i-1][0] if known_cmd: if known_cmd == "CANCELLED": parsed_positions.append({"pos": pos, "type": "CANCELLED", "preview": ""}) else: prev_chunk = raw_bytes[prev_pos:pos] prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace')) prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()] prompt_text = prev_lines[-1].strip() if prev_lines else "" preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd if len(preview) > 80: preview = preview[:77] + "..." parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview}) else: chunk = raw_bytes[prev_pos:pos] cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace')) lines = [l for l in cleaned.split('\n') if l.strip()] found_in_pass1 = False if lines: # Search backwards through the last few lines for the prompt for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1): match = prompt_re.search(lines[idx]) if match: ptxt = match.group(0).strip() cmd_first_line = lines[idx][match.end():].strip() cmd_rest = [l.strip() for l in lines[idx+1:]] cmd_text = " ".join([cmd_first_line] + cmd_rest).strip() if cmd_text: pv = f"{ptxt} {cmd_text}".strip() if len(pv) > 80: pv = pv[:77] + "..." parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv}) else: parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""}) found_in_pass1 = True break if not found_in_pass1: # Fallback: The prompt might have been isolated in the previous chunk # due to asynchronous network delays splitting the output exactly at the newline. prev_was_valid_cmd = i >= 2 and parsed_positions[i-2]["type"] == "VALID_CMD" if prev_pos > 0 and not prev_was_valid_cmd: # Fetch the very last chunk that we just processed prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0 prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace')) prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()] if prev_lines_text: prev_match = prompt_re.search(prev_lines_text[-1]) if prev_match: ptxt = prev_match.group(0).strip() cmd_text = " ".join([l.strip() for l in lines]).strip() if cmd_text: pv = f"{ptxt} {cmd_text}".strip() if len(pv) > 80: pv = pv[:77] + "..." parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv}) found_in_pass1 = True if not found_in_pass1: parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) else: parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) last_newline = raw_bytes.rfind(b'\n') current_prompt_pos = last_newline + 1 if last_newline != -1 else 0 current_end = len(raw_bytes) for i, item in enumerate(parsed_positions): if item["type"] == "VALID_CMD": start_pos = item["pos"] preview = item["preview"] # Find the end position: next VALID_CMD or EMPTY_PROMPT or CANCELLED end_pos = current_prompt_pos for j in range(i + 1, len(parsed_positions)): next_item = parsed_positions[j] if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT", "CANCELLED"): end_pos = next_item["pos"] break blocks.append((start_pos, end_pos, preview)) # Always ensure there is a final block representing the current prompt if not blocks: blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT")) elif blocks[-1][0] < current_prompt_pos: blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT")) return blocks def process_copilot_input(self, input_text: str, session_state: dict) -> dict: """Parses slash commands and manages session state. Returns directive dict.""" text = input_text.strip() if not text.startswith('/'): return {"action": "execute", "clean_prompt": text, "overrides": {}} parts = text.split(maxsplit=1) cmd = parts[0].lower() args = parts[1] if len(parts) > 1 else "" # 1. State Commands (Persistent) if cmd == "/os": if args: session_state['os'] = args return {"action": "state_update", "message": f"OS context changed to {args}"} elif cmd == "/prompt": if args: session_state['prompt'] = args return {"action": "state_update", "message": f"Prompt regex changed to {args}"} elif cmd == "/memorize": if args: session_state['memories'].append(args) return {"action": "state_update", "message": f"Memory added: {args}"} elif cmd == "/clear": session_state['memories'] = [] return {"action": "state_update", "message": "Memory cleared"} # 2. Hybrid Commands elif cmd == "/architect": if not args: session_state['persona'] = 'architect' return {"action": "state_update", "message": "Persona set to Architect"} else: return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "architect"}} elif cmd == "/engineer": if not args: session_state['persona'] = 'engineer' return {"action": "state_update", "message": "Persona set to Engineer"} else: return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "engineer"}} elif cmd == "/trust": if not args: session_state['trust_mode'] = True return {"action": "state_update", "message": "Auto-execute (trust) enabled for session"} else: return {"action": "execute", "clean_prompt": args, "overrides": {"trust": True}} elif cmd == "/untrust": if not args: session_state['trust_mode'] = False return {"action": "state_update", "message": "Auto-execute (trust) disabled for session"} else: return {"action": "execute", "clean_prompt": args, "overrides": {"trust": False}} # Unknown command, execute normally return {"action": "execute", "clean_prompt": text, "overrides": {}} def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides): """Send a prompt to the AI agent.""" from connpy.ai import ai agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides) return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback) def confirm(self, input_text, console=None): """Ask for a safe confirmation of an action.""" from connpy.ai import ai agent = ai(self.config, console=console) return agent.confirm(input_text) def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None): """Ask the AI copilot for terminal assistance.""" from connpy.ai import ai, run_ai_async agent = ai(self.config) future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)) return future.result() async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None): """Ask the AI copilot for terminal assistance asynchronously.""" from connpy.ai import ai, run_ai_async import asyncio agent = ai(self.config) future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)) return await asyncio.wrap_future(future) def list_sessions(self, limit=None): """Return a list of saved AI sessions, optionally limited.""" from connpy.ai import ai agent = ai(self.config) sessions = agent._get_sessions() if limit and len(sessions) > limit: return sessions[:limit], len(sessions) return sessions, len(sessions) def delete_session(self, session_id): """Delete an AI session by ID.""" import os sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions") path = os.path.join(sessions_dir, f"{session_id}.json") if os.path.exists(path): os.remove(path) else: raise InvalidConfigurationError(f"Session '{session_id}' not found.") def configure_provider(self, provider, model=None, api_key=None, auth=None): """Update AI provider settings in the configuration.""" settings = self.config.config.get("ai", {}) if model: settings[f"{provider}_model"] = model if api_key: settings[f"{provider}_api_key"] = api_key if auth is not None: settings[f"{provider}_auth"] = auth self.config.config["ai"] = settings self.config._saveconfig(self.config.file) def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False): """Update MCP server settings in the configuration with smart merging.""" ai_settings = self.config.config.get("ai", {}) mcp_servers = ai_settings.get("mcp_servers", {}) if remove: if name in mcp_servers: del mcp_servers[name] else: # Get existing or new server_cfg = mcp_servers.get(name, {}) # Partial updates if url is not None: server_cfg["url"] = url if enabled is not None: server_cfg["enabled"] = bool(enabled) elif "enabled" not in server_cfg: server_cfg["enabled"] = True # Default for new entries if auto_load_on_os is not None: if auto_load_on_os == "": # Explicit clear if "auto_load_on_os" in server_cfg: del server_cfg["auto_load_on_os"] else: server_cfg["auto_load_on_os"] = auto_load_on_os mcp_servers[name] = server_cfg ai_settings["mcp_servers"] = mcp_servers self.config.config["ai"] = ai_settings self.config._saveconfig(self.config.file) def list_mcp_servers(self) -> dict: """Get the configured MCP servers.""" if hasattr(self.config, "get_effective_setting"): ai_settings = self.config.get_effective_setting("ai", {}) else: ai_settings = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {} return ai_settings.get("mcp_servers", {}) def load_session_data(self, session_id): """Load a session's raw data by ID.""" from connpy.ai import ai agent = ai(self.config) return agent.load_session_data(session_id) def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None): """Interact with the specialized Playbook Builder Agent.""" from connpy.ai import PlaybookBuilderAgent agent = PlaybookBuilderAgent(self.config) return agent.ask(user_input, chat_history=chat_history, status=status, chunk_callback=chunk_callback) def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None): """Analyze actual command execution results using Network Architect 1-shot.""" import json results_str = json.dumps(results, indent=2) prompt = f"@architect: Please analyze the following actual execution results. Diagnose any issues, highlight successful actions, and suggest strategic remediation steps if needed." if query: prompt += f"\nSpecific user request: {query}" prompt += f"\n\nResults Data:\n{results_str}" prompt += "\n\nCRITICAL DIRECTIVE: You are running in a strictly 1-shot offline diagnostics mode (--analyze). There is no active conversation loop, and you are NOT conversing with a Network Engineer. You MUST deliver your complete strategic analysis immediately. DO NOT suggest, mention, or attempt to delegate the session back to the engineer." # Delegate to self.ask, setting stream=True and forwarding callback/status. # This will invoke standard ai.ask with '@architect:' prefix, forcing 1-shot architect brain. return self.ask(prompt, status=status, chunk_callback=chunk_callback, one_shot=True) def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None): """Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot).""" nodes_str = ", ".join(target_nodes) commands_str = "\n".join(f"- {cmd}" for cmd in commands) prompt = f"@engineer: Act as a Preflight Simulation Agent. Simulate and predict the expected outputs and behaviors of the following commands on the target nodes. Alert about potential safety or configuration risks based on node profiles." prompt += f"\n\nTarget Nodes: {nodes_str}" prompt += f"\nCommands to simulate:\n{commands_str}" prompt += "\n\nCRITICAL SCALABILITY DIRECTIVE: If there are many target nodes, DO NOT list predictions node-by-node. Instead, group them by Operating System, vendor, or platform, and provide a highly concise Executive Summary. Detail individual risks only for nodes that present specific anomalies or security concerns. Focus on overall impact." # Delegate to self.ask, using the standard engineer brain but with the simulated preflight prompt. return self.ask(prompt, status=status, chunk_callback=chunk_callback)Business logic for interacting with AI agents and LLM configurations.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None)-
Expand source code
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None): """Ask the AI copilot for terminal assistance asynchronously.""" from connpy.ai import ai, run_ai_async import asyncio agent = ai(self.config) future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)) return await asyncio.wrap_future(future)Ask the AI copilot for terminal assistance asynchronously.
def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None)-
Expand source code
def analyze_execution_results(self, results: dict, query: str = None, status=None, chunk_callback=None): """Analyze actual command execution results using Network Architect 1-shot.""" import json results_str = json.dumps(results, indent=2) prompt = f"@architect: Please analyze the following actual execution results. Diagnose any issues, highlight successful actions, and suggest strategic remediation steps if needed." if query: prompt += f"\nSpecific user request: {query}" prompt += f"\n\nResults Data:\n{results_str}" prompt += "\n\nCRITICAL DIRECTIVE: You are running in a strictly 1-shot offline diagnostics mode (--analyze). There is no active conversation loop, and you are NOT conversing with a Network Engineer. You MUST deliver your complete strategic analysis immediately. DO NOT suggest, mention, or attempt to delegate the session back to the engineer." # Delegate to self.ask, setting stream=True and forwarding callback/status. # This will invoke standard ai.ask with '@architect:' prefix, forcing 1-shot architect brain. return self.ask(prompt, status=status, chunk_callback=chunk_callback, one_shot=True)Analyze actual command execution results using Network Architect 1-shot.
def ask(self,
input_text,
dryrun=False,
chat_history=None,
status=None,
debug=False,
session_id=None,
console=None,
chunk_callback=None,
confirm_handler=None,
trust=False,
**overrides)-
Expand source code
def ask(self, input_text, dryrun=False, chat_history=None, status=None, debug=False, session_id=None, console=None, chunk_callback=None, confirm_handler=None, trust=False, **overrides): """Send a prompt to the AI agent.""" from connpy.ai import ai agent = ai(self.config, console=console, confirm_handler=confirm_handler, trust=trust, **overrides) return agent.ask(input_text, dryrun, chat_history, status=status, debug=debug, session_id=session_id, chunk_callback=chunk_callback)Send a prompt to the AI agent.
def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None)-
Expand source code
def ask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None): """Ask the AI copilot for terminal assistance.""" from connpy.ai import ai, run_ai_async agent = ai(self.config) future = run_ai_async(agent.aask_copilot(terminal_buffer, user_question, node_info, chunk_callback=chunk_callback)) return future.result()Ask the AI copilot for terminal assistance.
def build_context_blocks(self,
raw_bytes: bytes,
cmd_byte_positions: list,
node_info: dict,
last_line: str = '') ‑> list-
Expand source code
def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list: """Identifies command blocks in the terminal history.""" blocks = [] if not raw_bytes: return blocks default_prompt = r'>$|#$|\$$|>.$|#.$|\$.$' device_prompt = node_info.get("prompt", default_prompt) if isinstance(node_info, dict) else default_prompt prompt_re_str = re.sub(r'(?<!\\)\$', '', device_prompt) try: prompt_re = re.compile(prompt_re_str) except Exception: prompt_re = re.compile(re.sub(r'(?<!\\)\$', '', default_prompt)) parsed_positions = [] if cmd_byte_positions and len(cmd_byte_positions) >= 1: for i in range(1, len(cmd_byte_positions)): pos, known_cmd = cmd_byte_positions[i] prev_pos = cmd_byte_positions[i-1][0] if known_cmd: if known_cmd == "CANCELLED": parsed_positions.append({"pos": pos, "type": "CANCELLED", "preview": ""}) else: prev_chunk = raw_bytes[prev_pos:pos] prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace')) prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()] prompt_text = prev_lines[-1].strip() if prev_lines else "" preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd if len(preview) > 80: preview = preview[:77] + "..." parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview}) else: chunk = raw_bytes[prev_pos:pos] cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace')) lines = [l for l in cleaned.split('\n') if l.strip()] found_in_pass1 = False if lines: # Search backwards through the last few lines for the prompt for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1): match = prompt_re.search(lines[idx]) if match: ptxt = match.group(0).strip() cmd_first_line = lines[idx][match.end():].strip() cmd_rest = [l.strip() for l in lines[idx+1:]] cmd_text = " ".join([cmd_first_line] + cmd_rest).strip() if cmd_text: pv = f"{ptxt} {cmd_text}".strip() if len(pv) > 80: pv = pv[:77] + "..." parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv}) else: parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""}) found_in_pass1 = True break if not found_in_pass1: # Fallback: The prompt might have been isolated in the previous chunk # due to asynchronous network delays splitting the output exactly at the newline. prev_was_valid_cmd = i >= 2 and parsed_positions[i-2]["type"] == "VALID_CMD" if prev_pos > 0 and not prev_was_valid_cmd: # Fetch the very last chunk that we just processed prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0 prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace')) prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()] if prev_lines_text: prev_match = prompt_re.search(prev_lines_text[-1]) if prev_match: ptxt = prev_match.group(0).strip() cmd_text = " ".join([l.strip() for l in lines]).strip() if cmd_text: pv = f"{ptxt} {cmd_text}".strip() if len(pv) > 80: pv = pv[:77] + "..." parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv}) found_in_pass1 = True if not found_in_pass1: parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) else: parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) last_newline = raw_bytes.rfind(b'\n') current_prompt_pos = last_newline + 1 if last_newline != -1 else 0 current_end = len(raw_bytes) for i, item in enumerate(parsed_positions): if item["type"] == "VALID_CMD": start_pos = item["pos"] preview = item["preview"] # Find the end position: next VALID_CMD or EMPTY_PROMPT or CANCELLED end_pos = current_prompt_pos for j in range(i + 1, len(parsed_positions)): next_item = parsed_positions[j] if next_item["type"] in ("VALID_CMD", "EMPTY_PROMPT", "CANCELLED"): end_pos = next_item["pos"] break blocks.append((start_pos, end_pos, preview)) # Always ensure there is a final block representing the current prompt if not blocks: blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT")) elif blocks[-1][0] < current_prompt_pos: blocks.append((current_prompt_pos, current_end, last_line[:80] if last_line else "CURRENT CONTEXT")) return blocksIdentifies command blocks in the terminal history.
def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None)-
Expand source code
def build_playbook_chat(self, user_input: str, chat_history: list = None, status=None, chunk_callback=None): """Interact with the specialized Playbook Builder Agent.""" from connpy.ai import PlaybookBuilderAgent agent = PlaybookBuilderAgent(self.config) return agent.ask(user_input, chat_history=chat_history, status=status, chunk_callback=chunk_callback)Interact with the specialized Playbook Builder Agent.
def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False)-
Expand source code
def configure_mcp(self, name, url=None, enabled=None, auto_load_on_os=None, remove=False): """Update MCP server settings in the configuration with smart merging.""" ai_settings = self.config.config.get("ai", {}) mcp_servers = ai_settings.get("mcp_servers", {}) if remove: if name in mcp_servers: del mcp_servers[name] else: # Get existing or new server_cfg = mcp_servers.get(name, {}) # Partial updates if url is not None: server_cfg["url"] = url if enabled is not None: server_cfg["enabled"] = bool(enabled) elif "enabled" not in server_cfg: server_cfg["enabled"] = True # Default for new entries if auto_load_on_os is not None: if auto_load_on_os == "": # Explicit clear if "auto_load_on_os" in server_cfg: del server_cfg["auto_load_on_os"] else: server_cfg["auto_load_on_os"] = auto_load_on_os mcp_servers[name] = server_cfg ai_settings["mcp_servers"] = mcp_servers self.config.config["ai"] = ai_settings self.config._saveconfig(self.config.file)Update MCP server settings in the configuration with smart merging.
def configure_provider(self, provider, model=None, api_key=None, auth=None)-
Expand source code
def configure_provider(self, provider, model=None, api_key=None, auth=None): """Update AI provider settings in the configuration.""" settings = self.config.config.get("ai", {}) if model: settings[f"{provider}_model"] = model if api_key: settings[f"{provider}_api_key"] = api_key if auth is not None: settings[f"{provider}_auth"] = auth self.config.config["ai"] = settings self.config._saveconfig(self.config.file)Update AI provider settings in the configuration.
def confirm(self, input_text, console=None)-
Expand source code
def confirm(self, input_text, console=None): """Ask for a safe confirmation of an action.""" from connpy.ai import ai agent = ai(self.config, console=console) return agent.confirm(input_text)Ask for a safe confirmation of an action.
def delete_session(self, session_id)-
Expand source code
def delete_session(self, session_id): """Delete an AI session by ID.""" import os sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions") path = os.path.join(sessions_dir, f"{session_id}.json") if os.path.exists(path): os.remove(path) else: raise InvalidConfigurationError(f"Session '{session_id}' not found.")Delete an AI session by ID.
def list_mcp_servers(self) ‑> dict-
Expand source code
def list_mcp_servers(self) -> dict: """Get the configured MCP servers.""" if hasattr(self.config, "get_effective_setting"): ai_settings = self.config.get_effective_setting("ai", {}) else: ai_settings = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {} return ai_settings.get("mcp_servers", {})Get the configured MCP servers.
def list_sessions(self, limit=None)-
Expand source code
def list_sessions(self, limit=None): """Return a list of saved AI sessions, optionally limited.""" from connpy.ai import ai agent = ai(self.config) sessions = agent._get_sessions() if limit and len(sessions) > limit: return sessions[:limit], len(sessions) return sessions, len(sessions)Return a list of saved AI sessions, optionally limited.
def load_session_data(self, session_id)-
Expand source code
def load_session_data(self, session_id): """Load a session's raw data by ID.""" from connpy.ai import ai agent = ai(self.config) return agent.load_session_data(session_id)Load a session's raw data by ID.
def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None)-
Expand source code
def predict_execution_results(self, target_nodes: list, commands: list, status=None, chunk_callback=None): """Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot).""" nodes_str = ", ".join(target_nodes) commands_str = "\n".join(f"- {cmd}" for cmd in commands) prompt = f"@engineer: Act as a Preflight Simulation Agent. Simulate and predict the expected outputs and behaviors of the following commands on the target nodes. Alert about potential safety or configuration risks based on node profiles." prompt += f"\n\nTarget Nodes: {nodes_str}" prompt += f"\nCommands to simulate:\n{commands_str}" prompt += "\n\nCRITICAL SCALABILITY DIRECTIVE: If there are many target nodes, DO NOT list predictions node-by-node. Instead, group them by Operating System, vendor, or platform, and provide a highly concise Executive Summary. Detail individual risks only for nodes that present specific anomalies or security concerns. Focus on overall impact." # Delegate to self.ask, using the standard engineer brain but with the simulated preflight prompt. return self.ask(prompt, status=status, chunk_callback=chunk_callback)Predict and simulate execution results preventively using the Preflight Simulation Agent (1-shot).
def process_copilot_input(self, input_text: str, session_state: dict) ‑> dict-
Expand source code
def process_copilot_input(self, input_text: str, session_state: dict) -> dict: """Parses slash commands and manages session state. Returns directive dict.""" text = input_text.strip() if not text.startswith('/'): return {"action": "execute", "clean_prompt": text, "overrides": {}} parts = text.split(maxsplit=1) cmd = parts[0].lower() args = parts[1] if len(parts) > 1 else "" # 1. State Commands (Persistent) if cmd == "/os": if args: session_state['os'] = args return {"action": "state_update", "message": f"OS context changed to {args}"} elif cmd == "/prompt": if args: session_state['prompt'] = args return {"action": "state_update", "message": f"Prompt regex changed to {args}"} elif cmd == "/memorize": if args: session_state['memories'].append(args) return {"action": "state_update", "message": f"Memory added: {args}"} elif cmd == "/clear": session_state['memories'] = [] return {"action": "state_update", "message": "Memory cleared"} # 2. Hybrid Commands elif cmd == "/architect": if not args: session_state['persona'] = 'architect' return {"action": "state_update", "message": "Persona set to Architect"} else: return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "architect"}} elif cmd == "/engineer": if not args: session_state['persona'] = 'engineer' return {"action": "state_update", "message": "Persona set to Engineer"} else: return {"action": "execute", "clean_prompt": args, "overrides": {"persona": "engineer"}} elif cmd == "/trust": if not args: session_state['trust_mode'] = True return {"action": "state_update", "message": "Auto-execute (trust) enabled for session"} else: return {"action": "execute", "clean_prompt": args, "overrides": {"trust": True}} elif cmd == "/untrust": if not args: session_state['trust_mode'] = False return {"action": "state_update", "message": "Auto-execute (trust) disabled for session"} else: return {"action": "execute", "clean_prompt": args, "overrides": {"trust": False}} # Unknown command, execute normally return {"action": "execute", "clean_prompt": text, "overrides": {}}Parses slash commands and manages session state. Returns directive dict.
Inherited members
class ConfigService (config=None)-
Expand source code
class ConfigService(BaseService): """Business logic for general application settings and state configuration.""" def get_settings(self) -> Dict[str, Any]: """Get the global configuration settings block.""" settings = self.config.config.copy() settings["configfolder"] = self.config.defaultdir return settings def get_default_dir(self) -> str: """Get the default configuration directory.""" return self.config.defaultdir def set_config_folder(self, folder_path: str): """Set the default location for config file by writing to ~/.config/conn/.folder""" if not os.path.isdir(folder_path): raise ConnpyError(f"readable_dir:{folder_path} is not a valid path") pathfile = os.path.join(self.config.anchor_path, ".folder") folder = os.path.abspath(folder_path).rstrip('/') try: with open(pathfile, "w") as f: f.write(str(folder)) except Exception as e: raise ConnpyError(f"Failed to save config folder: {e}") def update_setting(self, key, value): """Update a setting in the configuration file.""" self.config.config[key] = value self.config._saveconfig(self.config.file) def encrypt_password(self, password): """Encrypt a password using the application's configuration encryption key.""" return self.config.encrypt(password) def apply_theme_from_file(self, theme_input): """Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration.""" import yaml from ..printer import STYLES, LIGHT_THEME if theme_input == "dark": valid_styles = {} self.update_setting("theme", valid_styles) return valid_styles elif theme_input == "light": valid_styles = LIGHT_THEME.copy() self.update_setting("theme", valid_styles) return valid_styles if not os.path.exists(theme_input): raise InvalidConfigurationError(f"Theme file '{theme_input}' not found.") try: with open(theme_input, 'r') as f: user_styles = yaml.safe_load(f) except Exception as e: raise InvalidConfigurationError(f"Failed to parse theme file: {e}") if not isinstance(user_styles, dict): raise InvalidConfigurationError("Theme file must be a YAML dictionary.") # Support both direct styles and nested under 'theme' key if "theme" in user_styles and isinstance(user_styles["theme"], dict): user_styles = user_styles["theme"] # Filter for valid styles only (prevent junk in config) valid_styles = {k: v for k, v in user_styles.items() if k in STYLES} if not valid_styles: raise InvalidConfigurationError("No valid style keys found in theme file.") # Persist and return merged styles self.update_setting("theme", valid_styles) return valid_stylesBusiness logic for general application settings and state configuration.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def apply_theme_from_file(self, theme_input)-
Expand source code
def apply_theme_from_file(self, theme_input): """Apply 'dark', 'light' theme or load a YAML theme file and save it to the configuration.""" import yaml from ..printer import STYLES, LIGHT_THEME if theme_input == "dark": valid_styles = {} self.update_setting("theme", valid_styles) return valid_styles elif theme_input == "light": valid_styles = LIGHT_THEME.copy() self.update_setting("theme", valid_styles) return valid_styles if not os.path.exists(theme_input): raise InvalidConfigurationError(f"Theme file '{theme_input}' not found.") try: with open(theme_input, 'r') as f: user_styles = yaml.safe_load(f) except Exception as e: raise InvalidConfigurationError(f"Failed to parse theme file: {e}") if not isinstance(user_styles, dict): raise InvalidConfigurationError("Theme file must be a YAML dictionary.") # Support both direct styles and nested under 'theme' key if "theme" in user_styles and isinstance(user_styles["theme"], dict): user_styles = user_styles["theme"] # Filter for valid styles only (prevent junk in config) valid_styles = {k: v for k, v in user_styles.items() if k in STYLES} if not valid_styles: raise InvalidConfigurationError("No valid style keys found in theme file.") # Persist and return merged styles self.update_setting("theme", valid_styles) return valid_stylesApply 'dark', 'light' theme or load a YAML theme file and save it to the configuration.
def encrypt_password(self, password)-
Expand source code
def encrypt_password(self, password): """Encrypt a password using the application's configuration encryption key.""" return self.config.encrypt(password)Encrypt a password using the application's configuration encryption key.
def get_default_dir(self) ‑> str-
Expand source code
def get_default_dir(self) -> str: """Get the default configuration directory.""" return self.config.defaultdirGet the default configuration directory.
def get_settings(self) ‑> Dict[str, Any]-
Expand source code
def get_settings(self) -> Dict[str, Any]: """Get the global configuration settings block.""" settings = self.config.config.copy() settings["configfolder"] = self.config.defaultdir return settingsGet the global configuration settings block.
def set_config_folder(self, folder_path: str)-
Expand source code
def set_config_folder(self, folder_path: str): """Set the default location for config file by writing to ~/.config/conn/.folder""" if not os.path.isdir(folder_path): raise ConnpyError(f"readable_dir:{folder_path} is not a valid path") pathfile = os.path.join(self.config.anchor_path, ".folder") folder = os.path.abspath(folder_path).rstrip('/') try: with open(pathfile, "w") as f: f.write(str(folder)) except Exception as e: raise ConnpyError(f"Failed to save config folder: {e}")Set the default location for config file by writing to ~/.config/conn/.folder
def update_setting(self, key, value)-
Expand source code
def update_setting(self, key, value): """Update a setting in the configuration file.""" self.config.config[key] = value self.config._saveconfig(self.config.file)Update a setting in the configuration file.
Inherited members
class ConnpyError (*args, **kwargs)-
Expand source code
class ConnpyError(Exception): """Base exception for all connpy services.""" passBase exception for all connpy services.
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
class ExecutionError (*args, **kwargs)-
Expand source code
class ExecutionError(ConnpyError): """Raised when an execution fails or returns error.""" passRaised when an execution fails or returns error.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class ExecutionService (config=None)-
Expand source code
class ExecutionService(BaseService): """Business logic for executing commands on nodes and running automation scripts.""" def run_commands( self, nodes_filter: str, commands: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 20, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, str]: """Execute commands on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.run( commands=commands, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) # Combine output and status for the caller full_results = {} for unique in results: full_results[unique] = { "output": results[unique], "status": executor.status.get(unique, 1) } return full_results except Exception as e: raise ConnpyError(f"Execution failed: {e}") def test_commands( self, nodes_filter: str, commands: List[str], expected: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 20, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, Dict[str, bool]]: """Run commands and verify expected output on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.test( commands=commands, expected=expected, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) return results except Exception as e: raise ConnpyError(f"Testing failed: {e}") def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]: """Run a plain-text script containing one command per line.""" if not os.path.exists(script_path): raise ConnpyError(f"Script file not found: {script_path}") try: with open(script_path, "r") as f: commands = [line.strip() for line in f if line.strip()] except Exception as e: raise ConnpyError(f"Failed to read script {script_path}: {e}") return self.run_commands(nodes_filter, commands, parallel=parallel)Business logic for executing commands on nodes and running automation scripts.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) ‑> Dict[str, str]-
Expand source code
def run_cli_script(self, nodes_filter: str, script_path: str, parallel: int = 10) -> Dict[str, str]: """Run a plain-text script containing one command per line.""" if not os.path.exists(script_path): raise ConnpyError(f"Script file not found: {script_path}") try: with open(script_path, "r") as f: commands = [line.strip() for line in f if line.strip()] except Exception as e: raise ConnpyError(f"Failed to read script {script_path}: {e}") return self.run_commands(nodes_filter, commands, parallel=parallel)Run a plain-text script containing one command per line.
def run_commands(self,
nodes_filter: str,
commands: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 20,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, str]-
Expand source code
def run_commands( self, nodes_filter: str, commands: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 20, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, str]: """Execute commands on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.run( commands=commands, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) # Combine output and status for the caller full_results = {} for unique in results: full_results[unique] = { "output": results[unique], "status": executor.status.get(unique, 1) } return full_results except Exception as e: raise ConnpyError(f"Execution failed: {e}")Execute commands on a set of nodes.
def test_commands(self,
nodes_filter: str,
commands: List[str],
expected: List[str],
variables: Dict[str, Any] | None = None,
parallel: int = 10,
timeout: int = 20,
folder: str | None = None,
prompt: str | None = None,
on_node_complete: Callable | None = None,
logger: Callable | None = None,
name: str | None = None) ‑> Dict[str, Dict[str, bool]]-
Expand source code
def test_commands( self, nodes_filter: str, commands: List[str], expected: List[str], variables: Optional[Dict[str, Any]] = None, parallel: int = 10, timeout: int = 20, folder: Optional[str] = None, prompt: Optional[str] = None, on_node_complete: Optional[Callable] = None, logger: Optional[Callable] = None, name: Optional[str] = None ) -> Dict[str, Dict[str, bool]]: """Run commands and verify expected output on a set of nodes.""" try: matched_names = self.config._getallnodes(nodes_filter) if not matched_names: raise ConnpyError(f"No nodes found matching filter: {nodes_filter}") node_data = self.config.getitems(matched_names, extract=True) executor = Nodes(node_data, config=self.config) self.last_executor = executor results = executor.test( commands=commands, expected=expected, vars=variables, parallel=parallel, timeout=timeout, folder=folder, prompt=prompt, on_complete=on_node_complete, logger=logger ) return results except Exception as e: raise ConnpyError(f"Testing failed: {e}")Run commands and verify expected output on a set of nodes.
Inherited members
class ImportExportService (config=None)-
Expand source code
class ImportExportService(BaseService): """Business logic for YAML/JSON inventory import and export.""" def export_to_file(self, file_path, folders=None): """Export nodes/folders to a YAML file.""" if os.path.exists(file_path): raise InvalidConfigurationError(f"File '{file_path}' already exists.") data = self.export_to_dict(folders) try: with open(file_path, "w") as f: yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False) except OSError as e: raise InvalidConfigurationError(f"Failed to export to '{file_path}': {e}") def export_to_dict(self, folders=None): """Export nodes/folders to a dictionary.""" if not folders: return deepcopy(self.config.connections) else: # Validate folders exist for f in folders: if f != "@" and f not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{f}' not found.") flat = self.config._getallnodesfull(folders, extract=False) nested = {} for k, v in flat.items(): uniques = self.config._explode_unique(k) if not uniques: continue if "folder" in uniques and "subfolder" in uniques: f_name = uniques["folder"] s_name = uniques["subfolder"] i_name = uniques["id"] if f_name not in nested: nested[f_name] = {"type": "folder"} if s_name not in nested[f_name]: nested[f_name][s_name] = {"type": "subfolder"} nested[f_name][s_name][i_name] = v elif "folder" in uniques: f_name = uniques["folder"] i_name = uniques["id"] if f_name not in nested: nested[f_name] = {"type": "folder"} nested[f_name][i_name] = v else: i_name = uniques["id"] nested[i_name] = v return nested def import_from_file(self, file_path): """Import nodes/folders from a YAML file.""" if not os.path.exists(file_path): raise InvalidConfigurationError(f"File '{file_path}' does not exist.") try: with open(file_path, "r") as f: data = yaml.load(f, Loader=yaml.FullLoader) self.import_from_dict(data) except Exception as e: raise InvalidConfigurationError(f"Failed to read/parse import file: {e}") def import_from_dict(self, data): """Import nodes/folders from a dictionary.""" if not isinstance(data, dict): raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.") def _traverse_import(node_data, current_folder='', current_subfolder=''): for k, v in node_data.items(): if k == "type": continue if isinstance(v, dict): node_type = v.get("type", "connection") if node_type == "folder": self.config._folder_add(folder=k) _traverse_import(v, current_folder=k, current_subfolder='') elif node_type == "subfolder": self.config._folder_add(folder=current_folder, subfolder=k) _traverse_import(v, current_folder=current_folder, current_subfolder=k) elif node_type == "connection": unique_id = k if current_subfolder: unique_id = f"{k}@{current_subfolder}@{current_folder}" elif current_folder: unique_id = f"{k}@{current_folder}" self._validate_node_name(unique_id) kwargs = deepcopy(v) kwargs['id'] = k kwargs['folder'] = current_folder kwargs['subfolder'] = current_subfolder self.config._connections_add(**kwargs) else: # Invalid format skip pass _traverse_import(data) self.config._saveconfig(self.config.file)Business logic for YAML/JSON inventory import and export.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def export_to_dict(self, folders=None)-
Expand source code
def export_to_dict(self, folders=None): """Export nodes/folders to a dictionary.""" if not folders: return deepcopy(self.config.connections) else: # Validate folders exist for f in folders: if f != "@" and f not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{f}' not found.") flat = self.config._getallnodesfull(folders, extract=False) nested = {} for k, v in flat.items(): uniques = self.config._explode_unique(k) if not uniques: continue if "folder" in uniques and "subfolder" in uniques: f_name = uniques["folder"] s_name = uniques["subfolder"] i_name = uniques["id"] if f_name not in nested: nested[f_name] = {"type": "folder"} if s_name not in nested[f_name]: nested[f_name][s_name] = {"type": "subfolder"} nested[f_name][s_name][i_name] = v elif "folder" in uniques: f_name = uniques["folder"] i_name = uniques["id"] if f_name not in nested: nested[f_name] = {"type": "folder"} nested[f_name][i_name] = v else: i_name = uniques["id"] nested[i_name] = v return nestedExport nodes/folders to a dictionary.
def export_to_file(self, file_path, folders=None)-
Expand source code
def export_to_file(self, file_path, folders=None): """Export nodes/folders to a YAML file.""" if os.path.exists(file_path): raise InvalidConfigurationError(f"File '{file_path}' already exists.") data = self.export_to_dict(folders) try: with open(file_path, "w") as f: yaml.dump(data, f, Dumper=NoAliasDumper, default_flow_style=False) except OSError as e: raise InvalidConfigurationError(f"Failed to export to '{file_path}': {e}")Export nodes/folders to a YAML file.
def import_from_dict(self, data)-
Expand source code
def import_from_dict(self, data): """Import nodes/folders from a dictionary.""" if not isinstance(data, dict): raise InvalidConfigurationError("Invalid import data format: expected a dictionary of nodes.") def _traverse_import(node_data, current_folder='', current_subfolder=''): for k, v in node_data.items(): if k == "type": continue if isinstance(v, dict): node_type = v.get("type", "connection") if node_type == "folder": self.config._folder_add(folder=k) _traverse_import(v, current_folder=k, current_subfolder='') elif node_type == "subfolder": self.config._folder_add(folder=current_folder, subfolder=k) _traverse_import(v, current_folder=current_folder, current_subfolder=k) elif node_type == "connection": unique_id = k if current_subfolder: unique_id = f"{k}@{current_subfolder}@{current_folder}" elif current_folder: unique_id = f"{k}@{current_folder}" self._validate_node_name(unique_id) kwargs = deepcopy(v) kwargs['id'] = k kwargs['folder'] = current_folder kwargs['subfolder'] = current_subfolder self.config._connections_add(**kwargs) else: # Invalid format skip pass _traverse_import(data) self.config._saveconfig(self.config.file)Import nodes/folders from a dictionary.
def import_from_file(self, file_path)-
Expand source code
def import_from_file(self, file_path): """Import nodes/folders from a YAML file.""" if not os.path.exists(file_path): raise InvalidConfigurationError(f"File '{file_path}' does not exist.") try: with open(file_path, "r") as f: data = yaml.load(f, Loader=yaml.FullLoader) self.import_from_dict(data) except Exception as e: raise InvalidConfigurationError(f"Failed to read/parse import file: {e}")Import nodes/folders from a YAML file.
Inherited members
class InvalidConfigurationError (*args, **kwargs)-
Expand source code
class InvalidConfigurationError(ConnpyError): """Raised when data or configuration input is invalid.""" passRaised when data or configuration input is invalid.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class NodeAlreadyExistsError (*args, **kwargs)-
Expand source code
class NodeAlreadyExistsError(ConnpyError): """Raised when a node or folder already exists.""" passRaised when a node or folder already exists.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class NodeNotFoundError (*args, **kwargs)-
Expand source code
class NodeNotFoundError(ConnpyError): """Raised when a connection or folder is not found.""" passRaised when a connection or folder is not found.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class NodeService (config=None)-
Expand source code
class NodeService(BaseService): def __init__(self, config=None): super().__init__(config) def list_nodes(self, filter_str=None, format_str=None): """Return a listed filtered by regex match and formatted if needed.""" nodes = self.config._getallnodes() case_sensitive = self.config.config.get("case", False) if filter_str: flags = re.IGNORECASE if not case_sensitive else 0 nodes = [n for n in nodes if re.search(filter_str, n, flags)] if not format_str: return nodes from .profile_service import ProfileService profile_service = ProfileService(self.config) formatted_nodes = [] for n_id in nodes: # Use ProfileService to resolve profiles for dynamic formatting details = self.config.getitem(n_id, extract=False) if details: details = profile_service.resolve_node_data(details) name = n_id.split("@")[0] location = n_id.partition("@")[2] or "root" # Prepare context for .format() with all details context = details.copy() context.update({ "name": name, "NAME": name.upper(), "location": location, "LOCATION": location.upper(), }) # Add exploded uniques (id, folder, subfolder) uniques = self.config._explode_unique(n_id) if uniques: context.update(uniques) # Add uppercase versions of all keys for convenience for k, v in list(context.items()): if isinstance(v, str): context[k.upper()] = v.upper() try: formatted_nodes.append(format_str.format(**context)) except (KeyError, IndexError, ValueError): # Fallback to original string if format fails formatted_nodes.append(n_id) return formatted_nodes def list_folders(self, filter_str=None): """Return all unique folders, optionally filtered by regex.""" folders = self.config._getallfolders() case_sensitive = self.config.config.get("case", False) if filter_str: if filter_str.startswith("@"): if not case_sensitive: folders = [f for f in folders if f.lower() == filter_str.lower()] else: folders = [f for f in folders if f == filter_str] else: flags = re.IGNORECASE if not case_sensitive else 0 folders = [f for f in folders if re.search(filter_str, f, flags)] return folders def get_node_details(self, unique_id): """Return full configuration dictionary for a specific node.""" try: details = self.config.getitem(unique_id) if not details: raise NodeNotFoundError(f"Node '{unique_id}' not found.") return details except (KeyError, TypeError): raise NodeNotFoundError(f"Node '{unique_id}' not found.") def explode_unique(self, unique_id): """Explode a unique ID into a dictionary of its parts.""" return self.config._explode_unique(unique_id) def generate_cache(self, nodes=None, folders=None, profiles=None): """Generate and update the internal nodes cache.""" self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles) def validate_parent_folder(self, unique_id, is_folder=False): """Check if parent folder exists for a given node unique ID.""" if is_folder: uniques = self.config._explode_unique(unique_id) if uniques and "subfolder" in uniques and "folder" in uniques: parent_folder = f"@{uniques['folder']}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") else: node_folder = unique_id.partition("@")[2] if node_folder: parent_folder = f"@{node_folder}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") def add_node(self, unique_id, data, is_folder=False): """Logic for adding a new node or folder to configuration.""" if not is_folder: self._validate_node_name(unique_id) all_nodes = self.config._getallnodes() all_folders = self.config._getallfolders() if is_folder: if unique_id in all_folders: raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.") uniques = self.config._explode_unique(unique_id) if not uniques: raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.") # Check if parent folder exists when creating a subfolder if "subfolder" in uniques: self.validate_parent_folder(unique_id, is_folder=True) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file) else: if unique_id in all_nodes: raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.") # Check if parent folder exists when creating a node in a folder self.validate_parent_folder(unique_id) # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques and "id" in uniques: data["id"] = uniques["id"] self.config._connections_add(**data) self.config._saveconfig(self.config.file) def update_node(self, unique_id, data, save=True): """Explicitly update an existing node.""" all_nodes = self.config._getallnodes() if unique_id not in all_nodes: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques: data["id"] = uniques["id"] # config._connections_add actually handles updates if ID exists correctly self.config._connections_add(**data) if save: self.config._saveconfig(self.config.file) def delete_node(self, unique_id, is_folder=False, save=True): """Logic for deleting a node or folder.""" if is_folder: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.") self.config._folder_del(**uniques) else: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.") self.config._connections_del(**uniques) if save: self.config._saveconfig(self.config.file) def connect_node(self, unique_id, sftp=False, debug=False, logger=None): """Interact with a node directly.""" from connpy.core import node from .profile_service import ProfileService node_data = self.config.getitem(unique_id, extract=False) if not node_data: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Resolve profiles profile_service = ProfileService(self.config) resolved_data = profile_service.resolve_node_data(node_data) n = node(unique_id, **resolved_data, config=self.config) if sftp: n.protocol = "sftp" n.interact(debug=debug, logger=logger) def move_node(self, src_id, dst_id, copy=False): """Move or copy a node.""" self._validate_node_name(dst_id) node_data = self.config.getitem(src_id) if not node_data: raise NodeNotFoundError(f"Source node '{src_id}' not found.") if dst_id in self.config._getallnodes(): raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.") new_uniques = self.config._explode_unique(dst_id) if not new_uniques: raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.") new_node_data = node_data.copy() new_node_data.update(new_uniques) self.config._connections_add(**new_node_data) if not copy: src_uniques = self.config._explode_unique(src_id) self.config._connections_del(**src_uniques) self.config._saveconfig(self.config.file) def bulk_add(self, ids, hosts, common_data): """Add multiple nodes with shared common configuration.""" count = 0 all_nodes = self.config._getallnodes() for i, uid in enumerate(ids): if uid in all_nodes: continue try: self._validate_node_name(uid) except ReservedNameError: # For bulk, we might want to just skip or log. # CLI caller will handle if it wants to be strict. continue host = hosts[i] if i < len(hosts) else hosts[0] uniques = self.config._explode_unique(uid) if not uniques: continue node_data = common_data.copy() node_data.pop("ids", None) node_data.pop("location", None) node_data.update(uniques) node_data["host"] = host node_data["type"] = "connection" self.config._connections_add(**node_data) count += 1 if count > 0: self.config._saveconfig(self.config.file) return count def full_replace(self, connections, profiles): """Replace all connections and profiles with new data.""" self.config.connections = connections self.config.profiles = profiles self.config._saveconfig(self.config.file) def get_inventory(self): """Return a full snapshot of connections and profiles.""" return { "connections": self.config.connections, "profiles": self.config.profiles }Base class for all connpy services, providing common configuration access.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def add_node(self, unique_id, data, is_folder=False)-
Expand source code
def add_node(self, unique_id, data, is_folder=False): """Logic for adding a new node or folder to configuration.""" if not is_folder: self._validate_node_name(unique_id) all_nodes = self.config._getallnodes() all_folders = self.config._getallfolders() if is_folder: if unique_id in all_folders: raise NodeAlreadyExistsError(f"Folder '{unique_id}' already exists.") uniques = self.config._explode_unique(unique_id) if not uniques: raise InvalidConfigurationError(f"Invalid folder name '{unique_id}'.") # Check if parent folder exists when creating a subfolder if "subfolder" in uniques: self.validate_parent_folder(unique_id, is_folder=True) self.config._folder_add(**uniques) self.config._saveconfig(self.config.file) else: if unique_id in all_nodes: raise NodeAlreadyExistsError(f"Node '{unique_id}' already exists.") # Check if parent folder exists when creating a node in a folder self.validate_parent_folder(unique_id) # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques and "id" in uniques: data["id"] = uniques["id"] self.config._connections_add(**data) self.config._saveconfig(self.config.file)Logic for adding a new node or folder to configuration.
def bulk_add(self, ids, hosts, common_data)-
Expand source code
def bulk_add(self, ids, hosts, common_data): """Add multiple nodes with shared common configuration.""" count = 0 all_nodes = self.config._getallnodes() for i, uid in enumerate(ids): if uid in all_nodes: continue try: self._validate_node_name(uid) except ReservedNameError: # For bulk, we might want to just skip or log. # CLI caller will handle if it wants to be strict. continue host = hosts[i] if i < len(hosts) else hosts[0] uniques = self.config._explode_unique(uid) if not uniques: continue node_data = common_data.copy() node_data.pop("ids", None) node_data.pop("location", None) node_data.update(uniques) node_data["host"] = host node_data["type"] = "connection" self.config._connections_add(**node_data) count += 1 if count > 0: self.config._saveconfig(self.config.file) return countAdd multiple nodes with shared common configuration.
def connect_node(self, unique_id, sftp=False, debug=False, logger=None)-
Expand source code
def connect_node(self, unique_id, sftp=False, debug=False, logger=None): """Interact with a node directly.""" from connpy.core import node from .profile_service import ProfileService node_data = self.config.getitem(unique_id, extract=False) if not node_data: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Resolve profiles profile_service = ProfileService(self.config) resolved_data = profile_service.resolve_node_data(node_data) n = node(unique_id, **resolved_data, config=self.config) if sftp: n.protocol = "sftp" n.interact(debug=debug, logger=logger)Interact with a node directly.
def delete_node(self, unique_id, is_folder=False, save=True)-
Expand source code
def delete_node(self, unique_id, is_folder=False, save=True): """Logic for deleting a node or folder.""" if is_folder: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Folder '{unique_id}' not found or invalid.") self.config._folder_del(**uniques) else: uniques = self.config._explode_unique(unique_id) if not uniques: raise NodeNotFoundError(f"Node '{unique_id}' not found or invalid.") self.config._connections_del(**uniques) if save: self.config._saveconfig(self.config.file)Logic for deleting a node or folder.
def explode_unique(self, unique_id)-
Expand source code
def explode_unique(self, unique_id): """Explode a unique ID into a dictionary of its parts.""" return self.config._explode_unique(unique_id)Explode a unique ID into a dictionary of its parts.
def full_replace(self, connections, profiles)-
Expand source code
def full_replace(self, connections, profiles): """Replace all connections and profiles with new data.""" self.config.connections = connections self.config.profiles = profiles self.config._saveconfig(self.config.file)Replace all connections and profiles with new data.
def generate_cache(self, nodes=None, folders=None, profiles=None)-
Expand source code
def generate_cache(self, nodes=None, folders=None, profiles=None): """Generate and update the internal nodes cache.""" self.config._generate_nodes_cache(nodes=nodes, folders=folders, profiles=profiles)Generate and update the internal nodes cache.
def get_inventory(self)-
Expand source code
def get_inventory(self): """Return a full snapshot of connections and profiles.""" return { "connections": self.config.connections, "profiles": self.config.profiles }Return a full snapshot of connections and profiles.
def get_node_details(self, unique_id)-
Expand source code
def get_node_details(self, unique_id): """Return full configuration dictionary for a specific node.""" try: details = self.config.getitem(unique_id) if not details: raise NodeNotFoundError(f"Node '{unique_id}' not found.") return details except (KeyError, TypeError): raise NodeNotFoundError(f"Node '{unique_id}' not found.")Return full configuration dictionary for a specific node.
def list_folders(self, filter_str=None)-
Expand source code
def list_folders(self, filter_str=None): """Return all unique folders, optionally filtered by regex.""" folders = self.config._getallfolders() case_sensitive = self.config.config.get("case", False) if filter_str: if filter_str.startswith("@"): if not case_sensitive: folders = [f for f in folders if f.lower() == filter_str.lower()] else: folders = [f for f in folders if f == filter_str] else: flags = re.IGNORECASE if not case_sensitive else 0 folders = [f for f in folders if re.search(filter_str, f, flags)] return foldersReturn all unique folders, optionally filtered by regex.
def list_nodes(self, filter_str=None, format_str=None)-
Expand source code
def list_nodes(self, filter_str=None, format_str=None): """Return a listed filtered by regex match and formatted if needed.""" nodes = self.config._getallnodes() case_sensitive = self.config.config.get("case", False) if filter_str: flags = re.IGNORECASE if not case_sensitive else 0 nodes = [n for n in nodes if re.search(filter_str, n, flags)] if not format_str: return nodes from .profile_service import ProfileService profile_service = ProfileService(self.config) formatted_nodes = [] for n_id in nodes: # Use ProfileService to resolve profiles for dynamic formatting details = self.config.getitem(n_id, extract=False) if details: details = profile_service.resolve_node_data(details) name = n_id.split("@")[0] location = n_id.partition("@")[2] or "root" # Prepare context for .format() with all details context = details.copy() context.update({ "name": name, "NAME": name.upper(), "location": location, "LOCATION": location.upper(), }) # Add exploded uniques (id, folder, subfolder) uniques = self.config._explode_unique(n_id) if uniques: context.update(uniques) # Add uppercase versions of all keys for convenience for k, v in list(context.items()): if isinstance(v, str): context[k.upper()] = v.upper() try: formatted_nodes.append(format_str.format(**context)) except (KeyError, IndexError, ValueError): # Fallback to original string if format fails formatted_nodes.append(n_id) return formatted_nodesReturn a listed filtered by regex match and formatted if needed.
def move_node(self, src_id, dst_id, copy=False)-
Expand source code
def move_node(self, src_id, dst_id, copy=False): """Move or copy a node.""" self._validate_node_name(dst_id) node_data = self.config.getitem(src_id) if not node_data: raise NodeNotFoundError(f"Source node '{src_id}' not found.") if dst_id in self.config._getallnodes(): raise NodeAlreadyExistsError(f"Destination node '{dst_id}' already exists.") new_uniques = self.config._explode_unique(dst_id) if not new_uniques: raise InvalidConfigurationError(f"Invalid destination format '{dst_id}'.") new_node_data = node_data.copy() new_node_data.update(new_uniques) self.config._connections_add(**new_node_data) if not copy: src_uniques = self.config._explode_unique(src_id) self.config._connections_del(**src_uniques) self.config._saveconfig(self.config.file)Move or copy a node.
def update_node(self, unique_id, data, save=True)-
Expand source code
def update_node(self, unique_id, data, save=True): """Explicitly update an existing node.""" all_nodes = self.config._getallnodes() if unique_id not in all_nodes: raise NodeNotFoundError(f"Node '{unique_id}' not found.") # Ensure 'id' is in data for config._connections_add if "id" not in data: uniques = self.config._explode_unique(unique_id) if uniques: data["id"] = uniques["id"] # config._connections_add actually handles updates if ID exists correctly self.config._connections_add(**data) if save: self.config._saveconfig(self.config.file)Explicitly update an existing node.
def validate_parent_folder(self, unique_id, is_folder=False)-
Expand source code
def validate_parent_folder(self, unique_id, is_folder=False): """Check if parent folder exists for a given node unique ID.""" if is_folder: uniques = self.config._explode_unique(unique_id) if uniques and "subfolder" in uniques and "folder" in uniques: parent_folder = f"@{uniques['folder']}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.") else: node_folder = unique_id.partition("@")[2] if node_folder: parent_folder = f"@{node_folder}" if parent_folder not in self.config._getallfolders(): raise NodeNotFoundError(f"Folder '{parent_folder}' not found.")Check if parent folder exists for a given node unique ID.
Inherited members
class PluginService (config=None)-
Expand source code
class PluginService(BaseService): """Business logic for enabling, disabling, and listing plugins.""" def _get_plugin_path(self, name, include_disabled=True): """Resolves the physical path of a plugin by name. Priority: user, shared/global, core.""" import os # 1. User directory user_dir = os.path.join(self.config.defaultdir, "plugins") if os.path.exists(user_dir): p_file = os.path.join(user_dir, f"{name}.py") if os.path.exists(p_file): return p_file, "user", True if include_disabled: bkp_file = os.path.join(user_dir, f"{name}.py.bkp") if os.path.exists(bkp_file): return bkp_file, "user", False # 2. Shared/Global directory if hasattr(self.config, "_shared_config") and self.config._shared_config: shared_dir = os.path.join(self.config._shared_config.defaultdir, "plugins") if os.path.exists(shared_dir): p_file = os.path.join(shared_dir, f"{name}.py") if os.path.exists(p_file): return p_file, "shared", True if include_disabled: bkp_file = os.path.join(shared_dir, f"{name}.py.bkp") if os.path.exists(bkp_file): return bkp_file, "shared", False # 3. Core plugins core_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins") p_file = os.path.join(core_dir, f"{name}.py") if os.path.exists(p_file): return p_file, "core", True return None, None, False def list_plugins(self): """List all core and user-defined plugins with their status and hash.""" import os import hashlib all_plugin_info = {} def get_hash(path): try: with open(path, "rb") as f: return hashlib.md5(f.read()).hexdigest() except Exception: return "" # 1. Scan core plugins (lowest priority) core_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins") if os.path.exists(core_dir): for f in os.listdir(core_dir): if f.endswith(".py"): name = f[:-3] path = os.path.join(core_dir, f) all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} # 2. Scan shared plugins (medium priority) if hasattr(self.config, "_shared_config") and self.config._shared_config: shared_dir = os.path.join(self.config._shared_config.defaultdir, "plugins") if os.path.exists(shared_dir): for f in os.listdir(shared_dir): if f.endswith(".py"): name = f[:-3] path = os.path.join(shared_dir, f) all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} elif f.endswith(".py.bkp"): name = f[:-7] all_plugin_info[name] = {"enabled": False} # 3. Scan user plugins (highest priority) user_dir = os.path.join(self.config.defaultdir, "plugins") if os.path.exists(user_dir): for f in os.listdir(user_dir): if f.endswith(".py"): name = f[:-3] path = os.path.join(user_dir, f) all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} elif f.endswith(".py.bkp"): name = f[:-7] all_plugin_info[name] = {"enabled": False} return all_plugin_info def add_plugin(self, name, source_file, update=False): """Add or update a plugin from a local file.""" import os import shutil from connpy.plugins import Plugins if not name.isalpha() or not name.islower() or len(name) > 15: raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.") p_manager = Plugins() # Check for bad script error = p_manager.verify_script(source_file) if error: raise InvalidConfigurationError(f"Invalid plugin script: {error}") self._save_plugin_file(name, source_file, update, is_path=True) def add_plugin_from_bytes(self, name, content, update=False): """Add or update a plugin from bytes (gRPC).""" import tempfile import os if not name.isalpha() or not name.islower() or len(name) > 15: raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.") # Write to temp file to verify script with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp: tmp.write(content) tmp_path = tmp.name try: from connpy.plugins import Plugins p_manager = Plugins() error = p_manager.verify_script(tmp_path) if error: raise InvalidConfigurationError(f"Invalid plugin script: {error}") self._save_plugin_file(name, tmp_path, update, is_path=True) finally: if os.path.exists(tmp_path): os.remove(tmp_path) def _save_plugin_file(self, name, source, update=False, is_path=True): import os import shutil plugin_dir = os.path.join(self.config.defaultdir, "plugins") os.makedirs(plugin_dir, exist_ok=True) target_file = os.path.join(plugin_dir, f"{name}.py") backup_file = f"{target_file}.bkp" if not update and (os.path.exists(target_file) or os.path.exists(backup_file)): raise InvalidConfigurationError(f"Plugin '{name}' already exists.") try: if is_path: shutil.copy2(source, target_file) else: with open(target_file, "wb") as f: f.write(source) except OSError as e: raise InvalidConfigurationError(f"Failed to save plugin file: {e}") def delete_plugin(self, name): """Remove a plugin file permanently.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" deleted = False for f in [plugin_file, disabled_file]: if os.path.exists(f): try: os.remove(f) deleted = True except OSError as e: raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}") if not deleted: # If not deleted from user directory, check if it's in shared or core path, origin, enabled = self._get_plugin_path(name, include_disabled=True) if origin in ["shared", "core"]: raise InvalidConfigurationError("Global and core plugins are read-only and cannot be deleted by users.") raise InvalidConfigurationError(f"Plugin '{name}' not found.") def enable_plugin(self, name): """Activate a plugin by renaming its backup file.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" if os.path.exists(disabled_file): # Check if it is a shadow bkp file (0 bytes shadowing shared/core) is_shadow = False if os.path.getsize(disabled_file) == 0: # Resolve without the local bkp file to verify if shared/core has it path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if origin in ["shared", "core"]: is_shadow = True if is_shadow: # Remove shadow file to restore inheritance try: os.remove(disabled_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to remove shadow file '{disabled_file}': {e}") else: try: os.rename(disabled_file, plugin_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}") if os.path.exists(plugin_file): return False # Already enabled # If it doesn't exist locally, check if it's already an active shared/core plugin path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if origin in ["shared", "core"]: return False # Already active/enabled through inheritance raise InvalidConfigurationError(f"Plugin '{name}' not found.") def disable_plugin(self, name): """Deactivate a plugin by renaming it to a backup file.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" if os.path.exists(plugin_file): # Regular user-level plugin exists. Rename to bkp try: os.rename(plugin_file, disabled_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}") if os.path.exists(disabled_file): return False # Already disabled # Check if it exists in shared or core path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if origin in ["shared", "core"]: # Shadow disable it by creating an empty .py.bkp in user plugins dir plugin_dir = os.path.dirname(plugin_file) os.makedirs(plugin_dir, exist_ok=True) try: with open(disabled_file, "w") as f: f.write("") return True except OSError as e: raise InvalidConfigurationError(f"Failed to create shadow disable file: {e}") raise InvalidConfigurationError(f"Plugin '{name}' not found or is already disabled.") def get_plugin_source(self, name): import os from ..services.exceptions import InvalidConfigurationError path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if not path: raise InvalidConfigurationError(f"Plugin '{name}' not found") with open(path, "r") as f: return f.read() def invoke_plugin(self, name, args_dict): import sys, io from argparse import Namespace from ..services.exceptions import InvalidConfigurationError from connpy.plugins import Plugins class MockApp: is_mock = True def __init__(self, config): from ..core import node, nodes from ..ai import ai from ..services.provider import ServiceProvider self.config = config self.node = node self.nodes = nodes self.ai = ai self.services = ServiceProvider(config, mode="local") # Get settings for CLI behavior settings = self.services.config_svc.get_settings() self.case = settings.get("case", False) self.fzf = settings.get("fzf", False) try: self.nodes_list = self.services.nodes.list_nodes() self.folders = self.services.nodes.list_folders() self.profiles = self.services.profiles.list_profiles() except Exception: self.nodes_list = [] self.folders = [] self.profiles = [] args = Namespace(**args_dict) p_manager = Plugins() import os path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if not path: raise InvalidConfigurationError(f"Plugin '{name}' not found") module = p_manager._import_from_path(path) parser = module.Parser().parser if hasattr(module, "Parser") else None if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]): args.func = getattr(module, args_dict["__func_name__"]) app = MockApp(self.config) from .. import printer from rich.console import Console from rich.console import Console import queue import threading q = queue.Queue() class QueueIO(io.StringIO): def write(self, s): q.put(s) return len(s) def flush(self): pass buf = QueueIO() old_console = printer._get_console() old_err_console = printer._get_err_console() def run_plugin(): printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_stream(buf) try: if hasattr(module, "Entrypoint"): module.Entrypoint(args, parser, app) except BaseException as e: if not isinstance(e, SystemExit): import traceback printer.err_console.print(traceback.format_exc()) finally: printer.set_thread_console(old_console) printer.set_thread_err_console(old_err_console) printer.set_thread_stream(None) q.put(None) t = threading.Thread(target=run_plugin, daemon=True) t.start() while True: item = q.get() if item is None: break yield itemBusiness logic for enabling, disabling, and listing plugins.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def add_plugin(self, name, source_file, update=False)-
Expand source code
def add_plugin(self, name, source_file, update=False): """Add or update a plugin from a local file.""" import os import shutil from connpy.plugins import Plugins if not name.isalpha() or not name.islower() or len(name) > 15: raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.") p_manager = Plugins() # Check for bad script error = p_manager.verify_script(source_file) if error: raise InvalidConfigurationError(f"Invalid plugin script: {error}") self._save_plugin_file(name, source_file, update, is_path=True)Add or update a plugin from a local file.
def add_plugin_from_bytes(self, name, content, update=False)-
Expand source code
def add_plugin_from_bytes(self, name, content, update=False): """Add or update a plugin from bytes (gRPC).""" import tempfile import os if not name.isalpha() or not name.islower() or len(name) > 15: raise InvalidConfigurationError("Plugin name should be lowercase letters up to 15 characters.") # Write to temp file to verify script with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as tmp: tmp.write(content) tmp_path = tmp.name try: from connpy.plugins import Plugins p_manager = Plugins() error = p_manager.verify_script(tmp_path) if error: raise InvalidConfigurationError(f"Invalid plugin script: {error}") self._save_plugin_file(name, tmp_path, update, is_path=True) finally: if os.path.exists(tmp_path): os.remove(tmp_path)Add or update a plugin from bytes (gRPC).
def delete_plugin(self, name)-
Expand source code
def delete_plugin(self, name): """Remove a plugin file permanently.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" deleted = False for f in [plugin_file, disabled_file]: if os.path.exists(f): try: os.remove(f) deleted = True except OSError as e: raise InvalidConfigurationError(f"Failed to delete plugin file '{f}': {e}") if not deleted: # If not deleted from user directory, check if it's in shared or core path, origin, enabled = self._get_plugin_path(name, include_disabled=True) if origin in ["shared", "core"]: raise InvalidConfigurationError("Global and core plugins are read-only and cannot be deleted by users.") raise InvalidConfigurationError(f"Plugin '{name}' not found.")Remove a plugin file permanently.
def disable_plugin(self, name)-
Expand source code
def disable_plugin(self, name): """Deactivate a plugin by renaming it to a backup file.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" if os.path.exists(plugin_file): # Regular user-level plugin exists. Rename to bkp try: os.rename(plugin_file, disabled_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to disable plugin '{name}': {e}") if os.path.exists(disabled_file): return False # Already disabled # Check if it exists in shared or core path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if origin in ["shared", "core"]: # Shadow disable it by creating an empty .py.bkp in user plugins dir plugin_dir = os.path.dirname(plugin_file) os.makedirs(plugin_dir, exist_ok=True) try: with open(disabled_file, "w") as f: f.write("") return True except OSError as e: raise InvalidConfigurationError(f"Failed to create shadow disable file: {e}") raise InvalidConfigurationError(f"Plugin '{name}' not found or is already disabled.")Deactivate a plugin by renaming it to a backup file.
def enable_plugin(self, name)-
Expand source code
def enable_plugin(self, name): """Activate a plugin by renaming its backup file.""" import os plugin_file = os.path.join(self.config.defaultdir, "plugins", f"{name}.py") disabled_file = f"{plugin_file}.bkp" if os.path.exists(disabled_file): # Check if it is a shadow bkp file (0 bytes shadowing shared/core) is_shadow = False if os.path.getsize(disabled_file) == 0: # Resolve without the local bkp file to verify if shared/core has it path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if origin in ["shared", "core"]: is_shadow = True if is_shadow: # Remove shadow file to restore inheritance try: os.remove(disabled_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to remove shadow file '{disabled_file}': {e}") else: try: os.rename(disabled_file, plugin_file) return True except OSError as e: raise InvalidConfigurationError(f"Failed to enable plugin '{name}': {e}") if os.path.exists(plugin_file): return False # Already enabled # If it doesn't exist locally, check if it's already an active shared/core plugin path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if origin in ["shared", "core"]: return False # Already active/enabled through inheritance raise InvalidConfigurationError(f"Plugin '{name}' not found.")Activate a plugin by renaming its backup file.
def get_plugin_source(self, name)-
Expand source code
def get_plugin_source(self, name): import os from ..services.exceptions import InvalidConfigurationError path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if not path: raise InvalidConfigurationError(f"Plugin '{name}' not found") with open(path, "r") as f: return f.read() def invoke_plugin(self, name, args_dict)-
Expand source code
def invoke_plugin(self, name, args_dict): import sys, io from argparse import Namespace from ..services.exceptions import InvalidConfigurationError from connpy.plugins import Plugins class MockApp: is_mock = True def __init__(self, config): from ..core import node, nodes from ..ai import ai from ..services.provider import ServiceProvider self.config = config self.node = node self.nodes = nodes self.ai = ai self.services = ServiceProvider(config, mode="local") # Get settings for CLI behavior settings = self.services.config_svc.get_settings() self.case = settings.get("case", False) self.fzf = settings.get("fzf", False) try: self.nodes_list = self.services.nodes.list_nodes() self.folders = self.services.nodes.list_folders() self.profiles = self.services.profiles.list_profiles() except Exception: self.nodes_list = [] self.folders = [] self.profiles = [] args = Namespace(**args_dict) p_manager = Plugins() import os path, origin, enabled = self._get_plugin_path(name, include_disabled=False) if not path: raise InvalidConfigurationError(f"Plugin '{name}' not found") module = p_manager._import_from_path(path) parser = module.Parser().parser if hasattr(module, "Parser") else None if "__func_name__" in args_dict and hasattr(module, args_dict["__func_name__"]): args.func = getattr(module, args_dict["__func_name__"]) app = MockApp(self.config) from .. import printer from rich.console import Console from rich.console import Console import queue import threading q = queue.Queue() class QueueIO(io.StringIO): def write(self, s): q.put(s) return len(s) def flush(self): pass buf = QueueIO() old_console = printer._get_console() old_err_console = printer._get_err_console() def run_plugin(): printer.set_thread_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_err_console(Console(file=buf, theme=printer.connpy_theme, force_terminal=True)) printer.set_thread_stream(buf) try: if hasattr(module, "Entrypoint"): module.Entrypoint(args, parser, app) except BaseException as e: if not isinstance(e, SystemExit): import traceback printer.err_console.print(traceback.format_exc()) finally: printer.set_thread_console(old_console) printer.set_thread_err_console(old_err_console) printer.set_thread_stream(None) q.put(None) t = threading.Thread(target=run_plugin, daemon=True) t.start() while True: item = q.get() if item is None: break yield item def list_plugins(self)-
Expand source code
def list_plugins(self): """List all core and user-defined plugins with their status and hash.""" import os import hashlib all_plugin_info = {} def get_hash(path): try: with open(path, "rb") as f: return hashlib.md5(f.read()).hexdigest() except Exception: return "" # 1. Scan core plugins (lowest priority) core_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "core_plugins") if os.path.exists(core_dir): for f in os.listdir(core_dir): if f.endswith(".py"): name = f[:-3] path = os.path.join(core_dir, f) all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} # 2. Scan shared plugins (medium priority) if hasattr(self.config, "_shared_config") and self.config._shared_config: shared_dir = os.path.join(self.config._shared_config.defaultdir, "plugins") if os.path.exists(shared_dir): for f in os.listdir(shared_dir): if f.endswith(".py"): name = f[:-3] path = os.path.join(shared_dir, f) all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} elif f.endswith(".py.bkp"): name = f[:-7] all_plugin_info[name] = {"enabled": False} # 3. Scan user plugins (highest priority) user_dir = os.path.join(self.config.defaultdir, "plugins") if os.path.exists(user_dir): for f in os.listdir(user_dir): if f.endswith(".py"): name = f[:-3] path = os.path.join(user_dir, f) all_plugin_info[name] = {"enabled": True, "hash": get_hash(path)} elif f.endswith(".py.bkp"): name = f[:-7] all_plugin_info[name] = {"enabled": False} return all_plugin_infoList all core and user-defined plugins with their status and hash.
Inherited members
class ProfileAlreadyExistsError (*args, **kwargs)-
Expand source code
class ProfileAlreadyExistsError(ConnpyError): """Raised when a profile with the same name already exists.""" passRaised when a profile with the same name already exists.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class ProfileNotFoundError (*args, **kwargs)-
Expand source code
class ProfileNotFoundError(ConnpyError): """Raised when a profile is not found.""" passRaised when a profile is not found.
Ancestors
- ConnpyError
- builtins.Exception
- builtins.BaseException
class ProfileService (config=None)-
Expand source code
class ProfileService(BaseService): """Business logic for node profiles management.""" def list_profiles(self, filter_str=None): """List all profile names, optionally filtered.""" profiles = list(self.config.profiles.keys()) case_sensitive = self.config.config.get("case", False) if filter_str: if not case_sensitive: f_str = filter_str.lower() return [p for p in profiles if f_str in p.lower()] else: return [p for p in profiles if filter_str in p] return profiles def get_profile(self, name, resolve=True): """Get the profile dictionary, optionally resolved.""" profile = self.config.profiles.get(name) if not profile: raise ProfileNotFoundError(f"Profile '{name}' not found.") if resolve: return self.resolve_node_data(profile) return profile def add_profile(self, name, data): """Add a new profile.""" if name in self.config.profiles: raise ProfileAlreadyExistsError(f"Profile '{name}' already exists.") # Filter data to match _profiles_add signature and ensure id is passed allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"} filtered_data = {k: v for k, v in data.items() if k in allowed_keys} self.config._profiles_add(id=name, **filtered_data) self.config._saveconfig(self.config.file) def resolve_node_data(self, node_data): """Resolve profile references (@profile) in node data and handle inheritance.""" resolved = node_data.copy() # 1. Identify all referenced profiles to support inheritance referenced_profiles = [] for value in resolved.values(): if isinstance(value, str) and value.startswith("@"): referenced_profiles.append(value[1:]) elif isinstance(value, list): for item in value: if isinstance(item, str) and item.startswith("@"): referenced_profiles.append(item[1:]) # 2. Resolve explicit references for key, value in resolved.items(): if isinstance(value, str) and value.startswith("@"): profile_name = value[1:] try: profile = self.get_profile(profile_name, resolve=True) resolved[key] = profile.get(key, "") except ProfileNotFoundError: resolved[key] = "" elif isinstance(value, list): resolved_list = [] for item in value: if isinstance(item, str) and item.startswith("@"): profile_name = item[1:] try: profile = self.get_profile(profile_name, resolve=True) if "password" in profile: resolved_list.append(profile["password"]) except ProfileNotFoundError: pass else: resolved_list.append(item) resolved[key] = resolved_list # 3. Inheritance: Fill empty keys from the first referenced profile if referenced_profiles: base_profile_name = referenced_profiles[0] try: base_profile = self.get_profile(base_profile_name, resolve=True) for key, value in base_profile.items(): # Fill if key is missing or empty if key not in resolved or resolved[key] == "" or resolved[key] == [] or resolved[key] is None: resolved[key] = value except ProfileNotFoundError: pass # 4. Handle default protocol if resolved.get("protocol") == "" or resolved.get("protocol") is None: try: default_profile = self.get_profile("default", resolve=True) resolved["protocol"] = default_profile.get("protocol", "ssh") except ProfileNotFoundError: resolved["protocol"] = "ssh" return resolved def delete_profile(self, name): """Delete an existing profile, with safety checks.""" if name not in self.config.profiles: raise ProfileNotFoundError(f"Profile '{name}' not found.") if name == "default": raise InvalidConfigurationError("Cannot delete the 'default' profile.") used_by = self.config._profileused(name) if used_by: # We return the list of nodes using it so the UI can inform the user raise InvalidConfigurationError(f"Profile '{name}' is used by nodes: {', '.join(used_by)}") self.config._profiles_del(id=name) self.config._saveconfig(self.config.file) def update_profile(self, name, data): """Update an existing profile.""" if name not in self.config.profiles: raise ProfileNotFoundError(f"Profile '{name}' not found.") # Merge with existing data existing = self.get_profile(name, resolve=False) updated_data = existing.copy() updated_data.update(data) # Filter data to match _profiles_add signature allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"} filtered_data = {k: v for k, v in updated_data.items() if k in allowed_keys} self.config._profiles_add(id=name, **filtered_data) self.config._saveconfig(self.config.file)Business logic for node profiles management.
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def add_profile(self, name, data)-
Expand source code
def add_profile(self, name, data): """Add a new profile.""" if name in self.config.profiles: raise ProfileAlreadyExistsError(f"Profile '{name}' already exists.") # Filter data to match _profiles_add signature and ensure id is passed allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"} filtered_data = {k: v for k, v in data.items() if k in allowed_keys} self.config._profiles_add(id=name, **filtered_data) self.config._saveconfig(self.config.file)Add a new profile.
def delete_profile(self, name)-
Expand source code
def delete_profile(self, name): """Delete an existing profile, with safety checks.""" if name not in self.config.profiles: raise ProfileNotFoundError(f"Profile '{name}' not found.") if name == "default": raise InvalidConfigurationError("Cannot delete the 'default' profile.") used_by = self.config._profileused(name) if used_by: # We return the list of nodes using it so the UI can inform the user raise InvalidConfigurationError(f"Profile '{name}' is used by nodes: {', '.join(used_by)}") self.config._profiles_del(id=name) self.config._saveconfig(self.config.file)Delete an existing profile, with safety checks.
def get_profile(self, name, resolve=True)-
Expand source code
def get_profile(self, name, resolve=True): """Get the profile dictionary, optionally resolved.""" profile = self.config.profiles.get(name) if not profile: raise ProfileNotFoundError(f"Profile '{name}' not found.") if resolve: return self.resolve_node_data(profile) return profileGet the profile dictionary, optionally resolved.
def list_profiles(self, filter_str=None)-
Expand source code
def list_profiles(self, filter_str=None): """List all profile names, optionally filtered.""" profiles = list(self.config.profiles.keys()) case_sensitive = self.config.config.get("case", False) if filter_str: if not case_sensitive: f_str = filter_str.lower() return [p for p in profiles if f_str in p.lower()] else: return [p for p in profiles if filter_str in p] return profilesList all profile names, optionally filtered.
def resolve_node_data(self, node_data)-
Expand source code
def resolve_node_data(self, node_data): """Resolve profile references (@profile) in node data and handle inheritance.""" resolved = node_data.copy() # 1. Identify all referenced profiles to support inheritance referenced_profiles = [] for value in resolved.values(): if isinstance(value, str) and value.startswith("@"): referenced_profiles.append(value[1:]) elif isinstance(value, list): for item in value: if isinstance(item, str) and item.startswith("@"): referenced_profiles.append(item[1:]) # 2. Resolve explicit references for key, value in resolved.items(): if isinstance(value, str) and value.startswith("@"): profile_name = value[1:] try: profile = self.get_profile(profile_name, resolve=True) resolved[key] = profile.get(key, "") except ProfileNotFoundError: resolved[key] = "" elif isinstance(value, list): resolved_list = [] for item in value: if isinstance(item, str) and item.startswith("@"): profile_name = item[1:] try: profile = self.get_profile(profile_name, resolve=True) if "password" in profile: resolved_list.append(profile["password"]) except ProfileNotFoundError: pass else: resolved_list.append(item) resolved[key] = resolved_list # 3. Inheritance: Fill empty keys from the first referenced profile if referenced_profiles: base_profile_name = referenced_profiles[0] try: base_profile = self.get_profile(base_profile_name, resolve=True) for key, value in base_profile.items(): # Fill if key is missing or empty if key not in resolved or resolved[key] == "" or resolved[key] == [] or resolved[key] is None: resolved[key] = value except ProfileNotFoundError: pass # 4. Handle default protocol if resolved.get("protocol") == "" or resolved.get("protocol") is None: try: default_profile = self.get_profile("default", resolve=True) resolved["protocol"] = default_profile.get("protocol", "ssh") except ProfileNotFoundError: resolved["protocol"] = "ssh" return resolvedResolve profile references (@profile) in node data and handle inheritance.
def update_profile(self, name, data)-
Expand source code
def update_profile(self, name, data): """Update an existing profile.""" if name not in self.config.profiles: raise ProfileNotFoundError(f"Profile '{name}' not found.") # Merge with existing data existing = self.get_profile(name, resolve=False) updated_data = existing.copy() updated_data.update(data) # Filter data to match _profiles_add signature allowed_keys = {"host", "options", "logs", "password", "port", "protocol", "user", "tags", "jumphost"} filtered_data = {k: v for k, v in updated_data.items() if k in allowed_keys} self.config._profiles_add(id=name, **filtered_data) self.config._saveconfig(self.config.file)Update an existing profile.
Inherited members
class SystemService (config=None)-
Expand source code
class SystemService(BaseService): """Business logic for application lifecycle (API, processes).""" def start_api(self, port=None): """Start the Connpy REST API.""" from connpy.api import start_api try: start_api(port, config=self.config) except Exception as e: raise ConnpyError(f"Failed to start API: {e}") def debug_api(self, port=None): """Start the Connpy REST API in debug mode.""" from connpy.api import debug_api try: debug_api(port, config=self.config) except Exception as e: raise ConnpyError(f"Failed to start API in debug mode: {e}") def stop_api(self): """Stop the Connpy REST API.""" try: import os import signal pids = ["/run/connpy.pid", "/tmp/connpy.pid"] stopped = False for pid_file in pids: if os.path.exists(pid_file): try: with open(pid_file, "r") as f: # Read only the first line (PID) line = f.readline().strip() if not line: continue pid = int(line) os.kill(pid, signal.SIGTERM) # Remove the PID file after successful kill os.remove(pid_file) stopped = True except (ValueError, OSError, ProcessLookupError): # If process is already dead, just remove the stale PID file try: os.remove(pid_file) except OSError: pass continue return stopped except Exception as e: raise ConnpyError(f"Failed to stop API: {e}") def restart_api(self, port=None): """Restart the Connpy REST API, maintaining the current port if none provided.""" if port is None: status = self.get_api_status() if status["running"] and status.get("port"): port = status["port"] self.stop_api() import time time.sleep(1) self.start_api(port) def get_api_status(self): """Check if the API is currently running.""" import os pids = ["/run/connpy.pid", "/tmp/connpy.pid"] for pid_file in pids: if os.path.exists(pid_file): try: with open(pid_file, "r") as f: pid_line = f.readline().strip() port_line = f.readline().strip() if not pid_line: continue pid = int(pid_line) port = int(port_line) if port_line else None # Signal 0 checks for process existence without killing it os.kill(pid, 0) return {"running": True, "pid": pid, "port": port, "pid_file": pid_file} except (ValueError, OSError, ProcessLookupError): continue return {"running": False}Business logic for application lifecycle (API, processes).
Initialize the service.
Args
config- An instance of configfile (or None to instantiate a new one/use global context).
Ancestors
Methods
def debug_api(self, port=None)-
Expand source code
def debug_api(self, port=None): """Start the Connpy REST API in debug mode.""" from connpy.api import debug_api try: debug_api(port, config=self.config) except Exception as e: raise ConnpyError(f"Failed to start API in debug mode: {e}")Start the Connpy REST API in debug mode.
def get_api_status(self)-
Expand source code
def get_api_status(self): """Check if the API is currently running.""" import os pids = ["/run/connpy.pid", "/tmp/connpy.pid"] for pid_file in pids: if os.path.exists(pid_file): try: with open(pid_file, "r") as f: pid_line = f.readline().strip() port_line = f.readline().strip() if not pid_line: continue pid = int(pid_line) port = int(port_line) if port_line else None # Signal 0 checks for process existence without killing it os.kill(pid, 0) return {"running": True, "pid": pid, "port": port, "pid_file": pid_file} except (ValueError, OSError, ProcessLookupError): continue return {"running": False}Check if the API is currently running.
def restart_api(self, port=None)-
Expand source code
def restart_api(self, port=None): """Restart the Connpy REST API, maintaining the current port if none provided.""" if port is None: status = self.get_api_status() if status["running"] and status.get("port"): port = status["port"] self.stop_api() import time time.sleep(1) self.start_api(port)Restart the Connpy REST API, maintaining the current port if none provided.
def start_api(self, port=None)-
Expand source code
def start_api(self, port=None): """Start the Connpy REST API.""" from connpy.api import start_api try: start_api(port, config=self.config) except Exception as e: raise ConnpyError(f"Failed to start API: {e}")Start the Connpy REST API.
def stop_api(self)-
Expand source code
def stop_api(self): """Stop the Connpy REST API.""" try: import os import signal pids = ["/run/connpy.pid", "/tmp/connpy.pid"] stopped = False for pid_file in pids: if os.path.exists(pid_file): try: with open(pid_file, "r") as f: # Read only the first line (PID) line = f.readline().strip() if not line: continue pid = int(line) os.kill(pid, signal.SIGTERM) # Remove the PID file after successful kill os.remove(pid_file) stopped = True except (ValueError, OSError, ProcessLookupError): # If process is already dead, just remove the stale PID file try: os.remove(pid_file) except OSError: pass continue return stopped except Exception as e: raise ConnpyError(f"Failed to stop API: {e}")Stop the Connpy REST API.
Inherited members