Files
connpy/connpy/ai.py
T

1940 lines
100 KiB
Python
Executable File

import os
import secrets
import sys
import json
import re
import datetime
import threading
import asyncio
from textwrap import dedent
from .core import nodes
from .mcp_client import MCPClientManager
_litellm_initialized = False
def _init_litellm():
global _litellm_initialized
if not _litellm_initialized:
import litellm
# Silenciar feedback de litellm
litellm.suppress_debug_info = True
litellm.set_verbose = False
_litellm_initialized = True
def completion(*args, **kwargs):
_init_litellm()
from litellm import completion as _completion
return _completion(*args, **kwargs)
def stream_chunk_builder(*args, **kwargs):
_init_litellm()
from litellm import stream_chunk_builder as _stream_chunk_builder
return _stream_chunk_builder(*args, **kwargs)
from .hooks import ClassHook, MethodHook
from . import printer
from rich.markdown import Markdown
from rich.panel import Panel
from rich.text import Text
from rich.console import Group
from rich.rule import Rule
console = printer.console
_ai_loop = None
_ai_thread = None
_ai_lock = threading.Lock()
def _get_ai_loop():
global _ai_loop, _ai_thread
with _ai_lock:
if _ai_loop is None:
_ai_loop = asyncio.new_event_loop()
_ai_thread = threading.Thread(target=_ai_loop.run_forever, name="ConnpyAILoop", daemon=True)
_ai_thread.start()
return _ai_loop
def run_ai_async(coro):
"""Run a coroutine in the dedicated AI background loop."""
loop = _get_ai_loop()
return asyncio.run_coroutine_threadsafe(coro, loop)
def cleanup():
"""Safely close any global litellm sessions in the dedicated AI loop."""
global _ai_loop
if _ai_loop:
try:
future = asyncio.run_coroutine_threadsafe(_async_cleanup(), _ai_loop)
future.result(timeout=5)
except:
pass
async def _async_cleanup():
"""Internal async cleanup for litellm sessions."""
try:
import litellm
# 1. Close synchronous session
if hasattr(litellm, "client_session") and litellm.client_session:
try:
if hasattr(litellm.client_session, "close"):
res = litellm.client_session.close()
if asyncio.iscoroutine(res): await res
except: pass
litellm.client_session = None
# 2. Close asynchronous session
if hasattr(litellm, "aclient_session") and litellm.aclient_session:
try:
session = litellm.aclient_session
litellm.aclient_session = None
if hasattr(session, "close"):
await session.close()
except: pass
except ImportError:
pass
@ClassHook
class ai:
"""Hybrid Multi-Agent System: Selective Escalation with Role Persistence."""
SAFE_COMMANDS = [
r'^show\s+', r'^ls\s*', r'^cat\s+', r'^ip\s+', r'^pwd$', r'^hostname$', r'^uname',
r'^df\s*', r'^free\s*', r'^ps\s*', r'^ping\s+', r'^traceroute\s+', r'^whois\s+',
r'^kubectl\s+(get|describe|version|logs|top|explain|cluster-info|api-resources|api-versions)\s+',
r'^systemctl\s+status\s+', r'^journalctl\s+'
]
def __init__(self, config, org=None, api_key=None, engineer_model=None, architect_model=None, engineer_api_key=None, architect_api_key=None, console=None, confirm_handler=None, trust=False, engineer_auth=None, architect_auth=None, **kwargs):
self.config = config
self.console = console or printer.console
self.confirm_handler = confirm_handler or self._local_confirm_handler
self.trusted_session = trust # Trust mode for the entire session
self.interrupted = False
self.one_shot = kwargs.get("one_shot", False)
# 1. Cargar configuración genérica con herencia/merge global
if hasattr(self.config, "get_effective_setting"):
aiconfig = self.config.get_effective_setting("ai", {})
else:
aiconfig = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {}
# Modelos (Prioridad: Argumento -> Config -> Default)
self.engineer_model = engineer_model or aiconfig.get("engineer_model") or "gemini/gemini-3.1-flash-lite"
self.architect_model = architect_model or aiconfig.get("architect_model") or "anthropic/claude-sonnet-4-6"
# API Keys (Prioridad: Argumento -> Config)
self.engineer_key = engineer_api_key or aiconfig.get("engineer_api_key")
self.architect_key = architect_api_key or aiconfig.get("architect_api_key")
# Auth configurations (Prioridad: Argumento -> Config)
self.engineer_auth = engineer_auth if engineer_auth is not None else aiconfig.get("engineer_auth")
if self.engineer_auth is None:
self.engineer_auth = {}
elif not isinstance(self.engineer_auth, dict):
self.engineer_auth = {}
self.architect_auth = architect_auth if architect_auth is not None else aiconfig.get("architect_auth")
if self.architect_auth is None:
self.architect_auth = {}
elif not isinstance(self.architect_auth, dict):
self.architect_auth = {}
# Backward compatibility fallbacks: only inject api_key if the auth dict is empty/not configured
if self.engineer_key and not self.engineer_auth:
self.engineer_auth["api_key"] = self.engineer_key
if self.architect_key and not self.architect_auth:
self.architect_auth["api_key"] = self.architect_key
# Strategic Reasoning Engine (Architect) availability
is_architect_keyless = "vertex" in self.architect_model.lower() or "ollama" in self.architect_model.lower() or "local" in self.architect_model.lower()
self.has_architect = bool(self.architect_key or self.architect_auth or is_architect_keyless)
# Custom Trusted Commands Regexes
custom_trusted = aiconfig.get("trusted_commands", [])
if isinstance(custom_trusted, str):
custom_trusted = [c.strip() for c in custom_trusted.split(",") if c.strip()]
self.safe_commands = list(self.SAFE_COMMANDS) + (custom_trusted if isinstance(custom_trusted, list) else [])
# Límites
self.max_history = 30
self.max_truncate = 50000
self.soft_limit_iterations = 20 # Show warning and suggest Ctrl+C
self.hard_limit_iterations = 50 # Force stop
# External tool registry (populated by plugins via ClassHook.modify)
self.external_engineer_tools = [] # Tool defs for Engineer LLM
self.external_architect_tools = [] # Tool defs for Architect LLM
self.external_tool_handlers = {} # {"tool_name": handler_callable}
self.tool_status_formatters = {} # {"tool_name": formatter_callable}
self.engineer_prompt_extensions = [] # Extra text for engineer prompt
self.architect_prompt_extensions = [] # Extra text for architect prompt
# MCP Manager
self.mcp_manager = MCPClientManager(self.config)
# Long-term memory
self.memory_path = os.path.join(self.config.defaultdir, "ai_memory.md")
self.long_term_memory = ""
if os.path.exists(self.memory_path):
try:
with open(self.memory_path, "r") as f:
self.long_term_memory = f.read()
except FileNotFoundError:
self.long_term_memory = ""
except PermissionError as e:
self.console.print(f"[warning]Warning: Cannot read AI memory file: {e}[/warning]")
except Exception as e:
self.console.print(f"[warning]Warning: Failed to load AI memory: {e}[/warning]")
# Session Management
self.sessions_dir = os.path.join(self.config.defaultdir, "ai_sessions")
os.makedirs(self.sessions_dir, exist_ok=True)
self.session_id = getattr(self.config, "session_id", None)
self.session_path = os.path.join(self.sessions_dir, f"{self.session_id}.json") if self.session_id else None
# Prompts base agnósticos
architect_instructions = ""
if self.has_architect:
architect_instructions = """
CRITICAL - CONSULT vs ESCALATE:
- ALWAYS use 'consult_architect' for: Configuration planning, design decisions, complex troubleshooting.
Examples: "consultalo con el arquitecto", "preguntale al arquitecto", "que opina el arquitecto"
You stay in control and present the advice to the user.
- ONLY use 'escalate_to_architect' when user EXPLICITLY asks to TALK to the Architect:
Examples: "quiero hablar con el arquitecto", "pasame con el arquitecto", "que me atienda el arquitecto"
After escalation, you hand over control completely.
- DEFAULT: When in doubt, use 'consult_architect'. Escalation is rare.
"""
else:
architect_instructions = """
CRITICAL - ARCHITECT UNAVAILABLE:
- The Strategic Reasoning Engine (Architect) is currently UNAVAILABLE because its API key or authentication is not configured.
- DO NOT attempt to consult or escalate to the architect.
- If the user asks to consult the architect, inform them that the Architect is offline and offer to help them directly to the best of your abilities.
"""
self._engineer_base_prompt = dedent(f"""
Role: TECHNICAL EXECUTION ENGINE.
Expertise: Universal Networking (Cisco, Nokia, Juniper, 6wind, etc.).
Rules:
- BE FAST AND EXTREMELY CONCISE: Provide direct answers. No filler words, no decorative language, no polite pleasantries. Save output tokens at all costs.
- KNOWLEDGE FIRST: For general networking questions (AS numbers, protocol details, standards, generic commands), use your internal knowledge. ONLY use tools when the user's specific infrastructure data is required.
- INVENTORY ONLY: 'run_commands', 'list_nodes', and 'get_node_info' are ONLY for interacting with the user's inventory.
- BROADCAST RESTRICTION: Avoid using filter '.*' in 'run_commands' unless the user explicitly requests a global action. Try to target specific nodes or groups based on the conversation.
- AUTONOMY: Proactively use iterative tool calls to find the root cause of infrastructure issues.
- BATCH OPERATIONS: When working on multiple devices, call tools in parallel.
- COMPLETE MISSIONS: Execute ALL steps of a mission before reporting back.
- DIAGRAM: Use ASCII art or Unicode box-drawing characters directly in your responses to visualize topologies or paths when helpful.
- EVIDENCE: Include 'Key Snippets' from tool outputs. Be token-efficient.
- LANGUAGE: You MUST respond in the same language used by the user in their question or instruction.
- NO WANDERING: Do not speculate. If stuck, report attempts.
- SAFETY: When you use 'run_commands' with configuration commands, the system automatically prompts the user for confirmation. Just execute - don't ask permission first.
{architect_instructions}
Network Context: {{self.long_term_memory if self.long_term_memory else "Empty."}}
""").strip()
self._architect_base_prompt = dedent(f"""
Role: STRATEGIC REASONING ENGINE.
Expertise: Network Architecture, Complex Troubleshooting, and Design Validation.
Rules:
- CONCISENESS IS MANDATORY: Strip out fluff, decorative language, and filler words. Provide direct, tactical instructions and analysis to save output tokens.
- STRATEGY: Define technical missions for the Engineer.
- DIAGRAM: Use ASCII art or Unicode box-drawing characters in your responses to visualize topologies, traffic paths, or logic flows.
- ENGINEER CAPABILITIES: Your Engineer can:
* Filter nodes (list_nodes), Run CLI commands (run_commands), Get metadata (get_node_info).
- ANALYSIS: Review technical findings to identify patterns or design failures.
- LANGUAGE: You MUST respond in the same language used by the user in their question or instruction.
- MEMORY: Update long-term facts ONLY when the user explicitly requests it.
CRITICAL - EFFICIENT DELEGATION:
- Plan ALL tasks upfront before delegating.
- Delegate ONCE with a complete, detailed mission including ALL steps.
- Example: "List all routers matching 'border.*', then run 'show ip bgp summary' and 'show ip route' on each, then analyze the outputs."
- DO NOT delegate multiple times for the same goal. Batch everything into ONE mission.
- Wait for Engineer's complete report before responding to user.
CRITICAL - RETURNING CONTROL:
- When your strategic analysis is complete and no further architectural decisions are needed, use 'return_to_engineer' to hand control back.
- The Engineer is better suited for ongoing technical execution and troubleshooting.
- Only stay in control if the user explicitly needs strategic oversight for multiple interactions.
Network Context: {self.long_term_memory if self.long_term_memory else "Empty."}
""").strip()
def _local_confirm_handler(self, prompt, default="n"):
"""Default confirmation handler using rich.prompt."""
from rich.prompt import Prompt
return Prompt.ask(prompt, default=default)
@property
def engineer_system_prompt(self):
"""Build engineer system prompt with plugin extensions."""
if self.engineer_prompt_extensions:
extensions = "\n".join(self.engineer_prompt_extensions)
return self._engineer_base_prompt + f"\n\nPlugin Capabilities:\n{extensions}"
return self._engineer_base_prompt
@property
def architect_system_prompt(self):
"""Build architect system prompt with plugin extensions."""
prompt = self._architect_base_prompt
if getattr(self, "one_shot", False):
prompt += "\n\nCRITICAL 1-SHOT DIAGNOSTICS DIRECTIVE:\nYou are running in a 1-shot offline diagnostics mode. There is no active conversation loop, and you are NOT conversing with a Network Engineer. You MUST deliver your complete strategic analysis immediately and directly to the user. Do not suggest or attempt to delegate/return control to the engineer."
if self.architect_prompt_extensions:
extensions = "\n".join(self.architect_prompt_extensions)
return prompt + f"\n\nPlugin Capabilities:\n{extensions}"
return prompt
def register_ai_tool(self, tool_definition, handler, target="engineer", engineer_prompt=None, architect_prompt=None, status_formatter=None):
"""Register an external tool for the AI system.
Args:
tool_definition (dict): OpenAI-compatible tool definition.
handler (callable): Function(ai_instance, **tool_args) -> str.
target (str): 'engineer', 'architect', or 'both'.
engineer_prompt (str): Extra text for engineer system prompt.
architect_prompt (str): Extra text for architect system prompt.
status_formatter (callable): Function(args_dict) -> status string.
"""
name = tool_definition["function"]["name"]
# Check if already registered to prevent duplicates
if target in ("engineer", "both"):
if not any(t["function"]["name"] == name for t in self.external_engineer_tools):
self.external_engineer_tools.append(tool_definition)
if target in ("architect", "both"):
if not any(t["function"]["name"] == name for t in self.external_architect_tools):
self.external_architect_tools.append(tool_definition)
self.external_tool_handlers[name] = handler
if engineer_prompt and engineer_prompt not in self.engineer_prompt_extensions:
self.engineer_prompt_extensions.append(engineer_prompt)
if architect_prompt and architect_prompt not in self.architect_prompt_extensions:
self.architect_prompt_extensions.append(architect_prompt)
if status_formatter:
self.tool_status_formatters[name] = status_formatter
def _stream_completion(self, model, messages, tools, api_key=None, status=None, label="", debug=False, chunk_callback=None, auth=None, **kwargs):
"""Stream a completion call, rendering styled Markdown in real-time.
Returns (response, streamed) where:
- response: reconstructed ModelResponse (same as non-streaming)
- streamed: True if text was rendered to console during streaming
"""
auth_dict = auth if auth is not None else {}
if api_key and "api_key" not in auth_dict:
auth_dict = auth_dict.copy()
auth_dict["api_key"] = api_key
stream_resp = completion(model=model, messages=messages, tools=tools, stream=True, **auth_dict, **kwargs)
chunks = []
full_content = ""
is_streaming_text = False
has_tool_calls = False
header_printed = False
# Determine styling based on current brain
role_label = "Network Architect" if "architect" in label.lower() else "Network Engineer"
alias = "architect" if "architect" in label.lower() else "engineer"
title = f"[bold {alias}]{role_label}[/bold {alias}]"
border = alias
try:
for chunk in stream_resp:
chunks.append(chunk)
delta = chunk.choices[0].delta
# Detect tool calls
if hasattr(delta, 'tool_calls') and delta.tool_calls:
has_tool_calls = True
# Stream text content with styled rendering
if hasattr(delta, 'content') and delta.content:
full_content += delta.content
if chunk and chunk_callback:
# Check for remote interruption during streaming
if hasattr(self, "interrupted") and self.interrupted:
raise KeyboardInterrupt
chunk_callback(delta.content)
if not chunk_callback:
if not is_streaming_text:
if status:
try:
status.stop()
except Exception:
pass
# Create a stable, direct Console to bypass _ConsoleProxy recreation bugs
from rich.console import Console as RichConsole
from rich.rule import Rule
from .printer import connpy_theme, get_original_stdout, IncrementalMarkdownParser
stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout())
stable_console.print(Rule(f"[bold {border}]{title}[/bold {border}]", style=border))
header_printed = True
md_parser = IncrementalMarkdownParser(console=stable_console)
is_streaming_text = True
md_parser.feed(delta.content)
except Exception as e:
if not chunks:
raise
finally:
if header_printed:
try:
md_parser.flush()
from rich.console import Console as RichConsole
from rich.rule import Rule
from .printer import connpy_theme, get_original_stdout
stable_console = RichConsole(theme=connpy_theme, file=get_original_stdout())
stable_console.print(Rule(style=border))
except Exception:
pass
# Rebuild complete response from chunks
try:
response = stream_chunk_builder(chunks, messages=messages)
except Exception:
# Fallback: manual reconstruction if stream_chunk_builder fails
full_content_rebuilt = ""
tool_calls_map = {}
for c in chunks:
d = c.choices[0].delta
if hasattr(d, 'content') and d.content:
full_content_rebuilt += d.content
if hasattr(d, 'tool_calls') and d.tool_calls:
for tc in d.tool_calls:
idx = tc.index
if idx not in tool_calls_map:
tool_calls_map[idx] = {"id": tc.id or "", "type": "function", "function": {"name": getattr(tc.function, 'name', '') or '', "arguments": getattr(tc.function, 'arguments', '') or ''}}
else:
if tc.id: tool_calls_map[idx]["id"] = tc.id
if tc.function:
if tc.function.name: tool_calls_map[idx]["function"]["name"] = tc.function.name
if tc.function.arguments: tool_calls_map[idx]["function"]["arguments"] += tc.function.arguments
# Build a minimal response-like object
class FakeFunc:
def __init__(self, name, arguments): self.name = name; self.arguments = arguments
class FakeTC:
def __init__(self, d): self.id = d["id"]; self.function = FakeFunc(d["function"]["name"], d["function"]["arguments"])
def model_dump(self, **kw): return {"id": self.id, "type": "function", "function": {"name": self.function.name, "arguments": self.function.arguments}}
class FakeMsg:
def __init__(self, content, tcs): self.content = content or None; self.tool_calls = tcs if tcs else None; self.role = "assistant"
def model_dump(self, **kw):
d = {"role": "assistant", "content": self.content}
if self.tool_calls: d["tool_calls"] = [tc.model_dump() for tc in self.tool_calls]
return d
class FakeChoice:
def __init__(self, msg): self.message = msg
class FakeResp:
def __init__(self, choice): self.choices = [choice]; self.usage = None
tcs = [FakeTC(tool_calls_map[i]) for i in sorted(tool_calls_map)] if tool_calls_map else None
response = FakeResp(FakeChoice(FakeMsg(full_content_rebuilt or full_content, tcs)))
# Only count as "streamed" if we rendered text AND it was the final response (no tool calls)
streamed = is_streaming_text and not has_tool_calls
return response, streamed
def _sanitize_messages(self, messages):
"""Sanitize message list for strict providers like Gemini.
Ensures that:
1. Every assistant message with tool_calls is followed by ALL its tool responses
2. No user/system messages appear between tool_calls and tool responses
3. Orphaned tool_calls at the end are removed
4. Orphaned tool responses without a preceding tool_call are removed
5. Incompatible metadata like cache_control is stripped for non-Anthropic models
6. Enforces strict alternating history to prevent BadRequestError on Gemini.
"""
if not messages:
return messages
# Pre-process messages to pull text from list contents (Anthropic cache format)
# and remove explicit cache keys.
pre_sanitized = []
for msg in messages:
m = msg.copy() if isinstance(msg, dict) else msg.model_dump(exclude_none=True)
# Convert content list to plain string if it's a system message with caching metadata
if m.get('role') == 'system' and isinstance(m.get('content'), list):
if m['content'] and isinstance(m['content'][0], dict) and m['content'][0].get('text'):
m['content'] = m['content'][0]['text']
else:
m['content'] = ""
# Remove any explicit cache_control key anywhere
if 'cache_control' in m: del m['cache_control']
if isinstance(m.get('content'), list):
for item in m['content']:
if isinstance(item, dict) and 'cache_control' in item: del item['cache_control']
pre_sanitized.append(m)
sanitized = []
last_role = None
i = 0
while i < len(pre_sanitized):
msg = pre_sanitized[i]
role = msg.get('role', '')
if role == 'system':
sanitized.append(msg)
last_role = 'system'
i += 1
elif role == 'user':
if last_role == 'user' and sanitized:
# Combine consecutive user messages
sanitized[-1]['content'] = str(sanitized[-1].get('content', '') or '') + '\n' + str(msg.get('content', '') or '')
else:
sanitized.append(msg)
last_role = 'user'
i += 1
elif role == 'assistant':
has_tools = bool(msg.get('tool_calls'))
# Gemini strict sequence: Assistant MUST be preceded by user or tool.
# If preceded by system, assistant, or if it's the very first message...
if last_role not in ('user', 'tool'):
sanitized.append({"role": "user", "content": "[System sequence separator: History Truncated/Merged]"})
last_role = 'user'
if has_tools:
# Look ahead for matching tool responses
tool_responses = []
j = i + 1
while j < len(pre_sanitized):
next_msg = pre_sanitized[j]
if next_msg.get('role') == 'tool':
tool_responses.append(next_msg)
j += 1
else:
break
if tool_responses:
sanitized.append(msg)
sanitized.extend(tool_responses)
last_role = 'tool'
i = j
else:
# Orphaned tool_calls with no responses - skip the assistant message
# If we just added a dummy user message for this assistant, remove it too
if sanitized and sanitized[-1].get('content') == "[System sequence separator: History Truncated/Merged]":
sanitized.pop()
last_role = sanitized[-1].get('role', '') if sanitized else None
i += 1
else:
sanitized.append(msg)
last_role = 'assistant'
i += 1
elif role == 'tool':
# Orphaned tool response (no preceding assistant with tool_calls) - skip
i += 1
else:
sanitized.append(msg)
last_role = role
i += 1
return sanitized
def _truncate(self, text, limit=None):
"""Truncate text to specified limit, keeping head (60%) and tail (40%)."""
if not isinstance(text, str): return str(text)
final_limit = limit or self.max_truncate
if len(text) <= final_limit: return text
head_limit = int(final_limit * 0.6)
tail_limit = int(final_limit * 0.4)
return (text[:head_limit] + f"\n\n[... OUTPUT TRUNCATED ...]\n\n" + text[-tail_limit:])
def _print_debug_observation(self, fn, obs, status=None):
"""Prints a tool observation in a readable way during debug mode."""
# Try to parse as JSON if it's a string
if isinstance(obs, str):
try:
obs_data = json.loads(obs)
except Exception:
obs_data = obs
else:
obs_data = obs
if isinstance(obs_data, dict):
elements = []
for k, v in obs_data.items():
elements.append(Text(f"{k}:", style="key"))
# Use Text for values to ensure newlines are rendered
val = str(v)
# If it's a multiline string from a delegation task, keep it clean
elements.append(Text(val))
if not elements:
content = Text("Empty data set")
else:
# Add a small spacer instead of a Rule for cleaner look
from rich.console import Group
content = Group(*elements)
elif isinstance(obs_data, list):
content = Text("\n".join(f"{item}" for item in obs_data))
else:
content = Text(str(obs_data))
title = f"[bold]{fn}[/bold]"
# Stop status before printing panel to avoid ghosting
if status:
try: status.stop()
except: pass
self.console.print(Panel(content, title=title, border_style="ai_status"))
# Resume status
if status:
try: status.start()
except: pass
def manage_memory_tool(self, content, action="append"):
"""Save or update long-term memory. Only use when user explicitly requests it."""
if not content or not content.strip():
return "Error: Cannot save empty content to memory."
try:
mode = "a" if action == "append" else "w"
os.makedirs(os.path.dirname(self.memory_path), exist_ok=True)
with open(self.memory_path, mode) as f:
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M')
f.write(f"\n\n## {timestamp}\n{content.strip()}\n" if action == "append" else content)
# Reload memory after update
with open(self.memory_path, "r") as f:
self.long_term_memory = f.read()
return "Memory updated successfully."
except PermissionError as e:
return f"Error: Permission denied writing to memory file: {e}"
except Exception as e:
return f"Error updating memory: {str(e)}"
def list_nodes_tool(self, filter_pattern=".*"):
"""List nodes matching the filter pattern. Returns metadata for <=5 nodes, names only for more."""
try:
matched_names = self.config._getallnodes(filter_pattern)
if not matched_names: return "No nodes found."
if len(matched_names) <= 5:
matched_data = self.config.getitems(matched_names, extract=True)
res = {}
for name, data in matched_data.items():
os_tag = "unknown"
if isinstance(data, dict):
ts = data.get("tags")
if isinstance(ts, dict): os_tag = ts.get("os", "unknown")
res[name] = {"os": os_tag}
return res
return {"count": len(matched_names), "nodes": matched_names, "note": "Use 'get_node_info' for details."}
except Exception as e:
return f"Error listing nodes: {str(e)}"
def _is_safe_command(self, cmd):
"""Check if a command matches safe patterns."""
return any(re.match(pattern, cmd.strip(), re.IGNORECASE) for pattern in self.safe_commands)
def run_commands_tool(self, nodes_filter, commands, status=None):
"""Execute commands on nodes matching the filter. Native interactive confirmation for unsafe commands."""
# Handle if commands is a JSON string
if isinstance(commands, str):
try:
commands = json.loads(commands)
except ValueError:
commands = [c.strip() for c in commands.split('\n') if c.strip()]
# Expand multi-line commands within a list (in case the AI packs them)
if isinstance(commands, list):
expanded_commands = []
for cmd in commands:
expanded_commands.extend([c.strip() for c in str(cmd).split('\n') if c.strip()])
commands = expanded_commands
else:
commands = [str(commands)]
# Check command safety natively
if not self.trusted_session:
unsafe_commands = [cmd for cmd in commands if not self._is_safe_command(cmd)]
if unsafe_commands:
# Stop the spinner so prompt doesn't get messed up
if status: status.stop()
# Show ALL commands with unsafe ones highlighted
formatted_cmds = []
for cmd in commands:
if cmd in unsafe_commands:
formatted_cmds.append(f" • [warning]{cmd}[/warning]")
else:
formatted_cmds.append(f"{cmd}")
panel_content = f"Target: {nodes_filter}\nCommands:\n" + "\n".join(formatted_cmds)
# Use print_important if available (for remote bridges) fallback to standard print
print_fn = getattr(self.console, "print_important", self.console.print)
print_fn(Panel(panel_content, title="[bold warning]⚠️ UNSAFE COMMANDS DETECTED[/bold warning]", border_style="warning"))
try:
user_resp = self.confirm_handler("[bold warning]Execute? (y: yes / n: no / a: allow all this session / <text>: feedback)[/bold warning]", default="n")
except KeyboardInterrupt:
if status: status.update("[ai_status]Engineer: Resuming...")
self.console.print("[fail]✗ Aborted by user (Ctrl+C).[/fail]")
raise
# Resume the spinner
if status: status.update("[ai_status]Engineer: Processing user response...")
user_resp_lower = user_resp.strip().lower()
if user_resp_lower in ['a', 'allow']:
self.trusted_session = True
self.console.print("[pass]✓ Trust Mode Enabled. All future commands in this session will execute without confirmation.[/pass]")
elif user_resp_lower in ['y', 'yes']:
self.console.print("[pass]✓ Executing...[/pass]")
elif user_resp_lower in ['n', 'no', '', 'cancel']:
self.console.print("[fail]✗ Execution rejected by user.[/fail]")
return "Error: User rejected execution."
else:
self.console.print(f"[user_prompt]User feedback: [/user_prompt]{user_resp}")
return f"User requested changes: {user_resp}. Please adjust the commands based on this feedback and try again."
try:
matched_names = self.config._getallnodes(nodes_filter)
if not matched_names: return "No nodes found matching filter."
thisnodes_dict = self.config.getitems(matched_names, extract=True)
result = nodes(thisnodes_dict, config=self.config).run(commands)
return result
except Exception as e:
return f"Error executing commands: {str(e)}"
def get_node_info_tool(self, node_name):
"""Get detailed metadata for a specific node. Passwords are masked."""
try:
d = self.config.getitem(node_name, extract=True)
if 'password' in d: d['password'] = '***'
return d
except Exception as e:
return f"Error getting node info: {str(e)}"
def _engineer_loop(self, task, status=None, debug=False, chat_history=None):
"""Internal loop where the Engineer executes technical tasks for the Architect."""
# Optimización de caché para el Ingeniero (Solo para Anthropic directo, Vertex tiene reglas distintas)
if "claude" in self.engineer_model.lower() and "vertex" not in self.engineer_model.lower():
messages = [{"role": "system", "content": [{"type": "text", "text": self.engineer_system_prompt, "cache_control": {"type": "ephemeral"}}]}]
else:
messages = [{"role": "system", "content": self.engineer_system_prompt}]
if chat_history:
# Clean chat history from caching metadata if engineer is not a compatible Claude model
if "claude" not in self.engineer_model.lower() or "vertex" in self.engineer_model.lower():
messages.extend(self._sanitize_messages(chat_history[-5:]))
else:
messages.extend(chat_history[-5:])
messages.append({"role": "user", "content": f"MISSION: {task}"})
tools = self._get_engineer_tools()
usage = {"input": 0, "output": 0, "total": 0}
iteration = 0
soft_limit_warned = False
try:
# Set up remote interrupt callback if bridge is provided
if status and hasattr(status, "on_interrupt"):
status.on_interrupt = lambda: setattr(self, "interrupted", True)
while iteration < self.hard_limit_iterations:
iteration += 1
# Check for interruption
if self.interrupted:
raise KeyboardInterrupt
if status and not chat_history:
status_text = f"[ai_status]Engineer: Analyzing mission... (step {iteration})"
if iteration >= self.soft_limit_iterations:
status_text += " [warning]⚠ Taking longer than expected (Ctrl+C to interrupt)[/warning]"
status.update(status_text)
try:
safe_messages = self._sanitize_messages(messages)
response = completion(model=self.engineer_model, messages=safe_messages, tools=tools, **self.engineer_auth)
except Exception as e:
if status: status.stop()
raise ValueError(f"Engineer failed to connect: {str(e)}")
if hasattr(response, "usage") and response.usage:
usage["input"] += getattr(response.usage, "prompt_tokens", 0)
usage["output"] += getattr(response.usage, "completion_tokens", 0)
usage["total"] += getattr(response.usage, "total_tokens", 0)
resp_msg = response.choices[0].message
msg_dict = resp_msg.model_dump(exclude_none=True)
if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None
messages.append(msg_dict)
if not resp_msg.tool_calls: break
for tc in resp_msg.tool_calls:
fn, args = tc.function.name, json.loads(tc.function.arguments)
# Notificación en tiempo real de la tarea técnica (Only if not in Architect loop)
if status and not chat_history:
s_text = ""
if fn == "list_nodes": s_text = f"[ai_status]Engineer: [SEARCH] {args.get('filter_pattern','.*')}"
elif fn == "run_commands":
cmds = args.get('commands', [])
cmd_str = cmds[0] if cmds else ""
s_text = f"[ai_status]Engineer: [CMD] {cmd_str}"
elif fn == "get_node_info": s_text = f"[ai_status]Engineer: [INSPECT] {args.get('node_name','')}"
elif fn.startswith("mcp_"):
server = fn.split("__")[0].replace("mcp_", "")
tool = fn.split("__")[1] if "__" in fn else fn
s_text = f"[ai_status]Engineer: [MCP:{server}] {tool}"
elif fn in self.tool_status_formatters: s_text = self.tool_status_formatters[fn](args)
if s_text:
if iteration >= self.soft_limit_iterations:
s_text += " [warning]⚠ Taking longer than expected (Ctrl+C to interrupt)[/warning]"
status.update(s_text)
if debug:
self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
elif fn == "get_node_info": obs = self.get_node_info_tool(**args)
elif fn.startswith("mcp_"):
obs = run_ai_async(self.mcp_manager.call_tool(fn, args)).result(timeout=60)
elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args)
else: obs = f"Error: Unknown tool '{fn}'."
if debug:
self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": self._truncate(obs_str)})
if iteration >= self.hard_limit_iterations:
self.console.print(f"[error]⛔ Engineer reached hard limit ({self.hard_limit_iterations} steps). Forcing stop.[/error]")
if debug and resp_msg.content:
self.console.print(Panel(Text(resp_msg.content), title="[bold engineer]Engineer Final Report to Architect[/bold engineer]", border_style="engineer"))
return resp_msg.content, usage
except Exception as e:
return f"Engineer failed: {str(e)}", usage
def _get_engineer_tools(self, os_filter: str = None):
"""Define tools available to the Engineer."""
base_tools = [
{"type": "function", "function": {"name": "list_nodes", "description": "[Universal Platform] Lists available nodes in the inventory.", "parameters": {"type": "object", "properties": {"filter_pattern": {"type": "string", "description": "Regex to filter nodes (e.g. '.*', 'border.*')."}}}}},
{"type": "function", "function": {"name": "run_commands", "description": "[Universal Platform] Runs one or more commands on matched nodes. MANDATORY: You MUST call 'list_nodes' first to verify the target list.", "parameters": {"type": "object", "properties": {"nodes_filter": {"type": "string", "description": "Exact node name or verified filter pattern."}, "commands": {"type": "array", "items": {"type": "string"}, "description": "List of commands (e.g. ['show ip route', 'show int desc'])."}}, "required": ["nodes_filter", "commands"]}}},
{"type": "function", "function": {"name": "get_node_info", "description": "[Universal Platform] Gets full metadata for a specific node.", "parameters": {"type": "object", "properties": {"node_name": {"type": "string"}}, "required": ["node_name"]}}}
]
# Add dynamic tools from MCP
try:
mcp_tools = run_ai_async(self.mcp_manager.get_tools_for_llm(os_filter=os_filter)).result(timeout=10)
base_tools.extend(mcp_tools)
except Exception as e:
# Silently fail for LLM tools
pass
if self.architect_key:
base_tools.extend([
{"type": "function", "function": {"name": "consult_architect", "description": "Ask the Strategic Reasoning Engine for advice on complex design, architecture, or troubleshooting decisions. You remain in control and will present the response to the user. Use this for: configuration planning, design validation, complex troubleshooting.", "parameters": {"type": "object", "properties": {"question": {"type": "string", "description": "Strategic question or decision needed."}, "technical_summary": {"type": "string", "description": "Technical findings and context gathered so far."}}, "required": ["question", "technical_summary"]}}},
{"type": "function", "function": {"name": "escalate_to_architect", "description": "Transfer full control to the Strategic Reasoning Engine. Use ONLY when the user explicitly requests the Architect or when the problem requires strategic oversight beyond consultation. After escalation, the Architect takes over the conversation.", "parameters": {"type": "object", "properties": {"reason": {"type": "string", "description": "Why you're escalating (e.g. 'User requested Architect', 'Complex multi-site design needed')."}, "context": {"type": "string", "description": "Full context and findings to hand over."}}, "required": ["reason", "context"]}}}
])
# Deduplicate by name to prevent Gemini BadRequestError
all_tools = base_tools + self.external_engineer_tools
seen_names = set()
unique_tools = []
for t in all_tools:
name = t["function"]["name"]
if name not in seen_names:
unique_tools.append(t)
seen_names.add(name)
return unique_tools
def _get_architect_tools(self):
"""Define tools available to the Strategic Reasoning Engine."""
base_tools = [
{"type": "function", "function": {"name": "delegate_to_engineer", "description": "Delegates a technical mission to the Engineer.", "parameters": {"type": "object", "properties": {"task": {"type": "string", "description": "Detailed technical mission or goal."}}, "required": ["task"]}}},
{"type": "function", "function": {"name": "return_to_engineer", "description": "Return control to the Engineer. Use this when your strategic analysis is complete and the Engineer should handle the rest of the conversation.", "parameters": {"type": "object", "properties": {"summary": {"type": "string", "description": "Brief summary of your analysis to hand over to the Engineer."}}, "required": ["summary"]}}},
{"type": "function", "function": {"name": "manage_memory_tool", "description": "Saves information to long-term memory. MANDATORY: Only use this if the user explicitly asks to remember or save something.", "parameters": {"type": "object", "properties": {"content": {"type": "string"}, "action": {"type": "string", "enum": ["append", "replace"]}}, "required": ["content"]}}}
]
if getattr(self, "one_shot", False):
base_tools = [t for t in base_tools if t["function"]["name"] not in ("delegate_to_engineer", "return_to_engineer")]
all_tools = base_tools + self.external_architect_tools
seen_names = set()
unique_tools = []
for t in all_tools:
name = t["function"]["name"]
if name not in seen_names:
unique_tools.append(t)
seen_names.add(name)
return unique_tools
def _get_sessions(self):
"""Returns a list of session metadata sorted by date."""
sessions = []
if not os.path.exists(self.sessions_dir):
return []
for f in os.listdir(self.sessions_dir):
if f.endswith(".json"):
path = os.path.join(self.sessions_dir, f)
try:
with open(path, "r") as fs:
data = json.load(fs)
sessions.append({
"id": f[:-5],
"title": data.get("title", "Untitled Session"),
"created_at": data.get("created_at", "Unknown"),
"model": data.get("model", "Unknown"),
"path": path
})
except Exception:
continue
return sorted(sessions, key=lambda x: x["created_at"], reverse=True)
def list_sessions(self, limit=20):
"""Prints a list of sessions using printer.table."""
sessions = self._get_sessions()
if not sessions:
printer.info("No saved AI sessions found.")
return
total = len(sessions)
if limit and total > limit:
sessions = sessions[:limit]
columns = ["ID", "Title", "Created At", "Model"]
rows = [[s["id"], s["title"], s["created_at"], s["model"]] for s in sessions]
title = "AI Persisted Sessions"
if limit and total > limit:
title += f" (Showing last {limit} of {total})"
printer.table(title, columns, rows)
if limit and total > limit:
printer.info(f"Use '--list --all' (if supported) or check the sessions directory to see all {total} sessions.")
def load_session_data(self, session_id):
"""Loads a session's raw data by ID."""
path = os.path.join(self.sessions_dir, f"{session_id}.json")
if os.path.exists(path):
try:
with open(path, "r") as f:
data = json.load(f)
self.session_id = session_id
self.session_path = path
return data
except Exception as e:
printer.error(f"Failed to load session {session_id}: {e}")
return None
def delete_session(self, session_id):
"""Deletes a session by ID."""
path = os.path.join(self.sessions_dir, f"{session_id}.json")
if os.path.exists(path):
os.remove(path)
printer.success(f"Session {session_id} deleted.")
else:
printer.error(f"Session {session_id} not found.")
def get_last_session_id(self):
"""Returns the ID of the most recent session."""
sessions = self._get_sessions()
return sessions[0]["id"] if sessions else None
def _generate_session_id(self, query):
"""Generates a unique session ID based on timestamp and a random suffix."""
ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
suffix = secrets.token_hex(2)
return f"{ts}-{suffix}"
def save_session(self, history, title=None, model=None):
"""Saves current history to the session file."""
if not self.session_id:
# Generate ID from first user query if available
first_user_msg = next((m["content"] for m in history if m["role"] == "user"), "new-session")
self.session_id = self._generate_session_id(first_user_msg)
self.session_path = os.path.join(self.sessions_dir, f"{self.session_id}.json")
elif not self.session_path:
self.session_path = os.path.join(self.sessions_dir, f"{self.session_id}.json")
# If it's a new file, we might want to set a better title
if not os.path.exists(self.session_path) and not title:
raw_title = next((m["content"] for m in history if m["role"] == "user"), "New Session")
# Clean title: remove newlines, multiple spaces
clean_title = " ".join(raw_title.split())
if len(clean_title) > 40:
title = clean_title[:37].strip() + "..."
else:
title = clean_title
try:
# Read existing metadata if it exists
metadata = {}
if os.path.exists(self.session_path):
with open(self.session_path, "r") as f:
metadata = json.load(f)
metadata.update({
"id": self.session_id,
"title": title or metadata.get("title", "New Session"),
"created_at": metadata.get("created_at", datetime.datetime.now().isoformat()),
"updated_at": datetime.datetime.now().isoformat(),
"model": model or metadata.get("model", self.engineer_model),
"history": history
})
with open(self.session_path, "w") as f:
json.dump(metadata, f, indent=4)
except Exception as e:
printer.error(f"Failed to save session: {e}")
except Exception as e:
printer.error(f"Failed to save session: {e}")
@MethodHook
def ask(self, user_input, dryrun=False, chat_history=None, status=None, debug=False, stream=True, session_id=None, chunk_callback=None):
is_engineer_keyless = "vertex" in self.engineer_model.lower() or "ollama" in self.engineer_model.lower() or "local" in self.engineer_model.lower()
if not self.engineer_key and not self.engineer_auth and not is_engineer_keyless:
raise ValueError("Engineer API key or authentication not configured. Use 'connpy config --engineer-auth <auth>' to set it.")
def update_status(text):
if not status:
return
if iteration >= self.soft_limit_iterations:
warning_suffix = " [warning]⚠ Taking longer than expected (Ctrl+C to interrupt)[/warning]"
if warning_suffix not in text:
text += warning_suffix
status.update(text)
if chat_history is None: chat_history = []
# Load session if provided and history is empty
if session_id:
# Force the session_id even if it doesn't exist yet
self.session_id = session_id
self.session_path = os.path.join(self.sessions_dir, f"{session_id}.json")
if not chat_history:
session_data = self.load_session_data(session_id)
if session_data:
chat_history = session_data.get("history", [])
# If we loaded history, the caller might need it back
# But typically ask() is called in a loop with an external history object
usage = {"input": 0, "output": 0, "total": 0}
# 1. Selector de Rol inicial (Sticky Brain)
explicit_architect = re.match(r'^(architect|arquitecto|@architect)[:\s]', user_input, re.I)
explicit_engineer = re.match(r'^(engineer|ingeniero|@engineer)[:\s]', user_input, re.I)
if explicit_architect:
current_brain = "architect"
elif explicit_engineer:
current_brain = "engineer"
else:
# Sticky Brain: Detectar si el Arquitecto estaba al mando en el historial reciente
is_architect_active = False
for msg in reversed(chat_history[-5:]):
tcs = msg.get('tool_calls') if isinstance(msg, dict) else getattr(msg, 'tool_calls', None)
if tcs:
for tc in tcs:
fn = tc.get('function', {}).get('name') if isinstance(tc, dict) else getattr(getattr(tc, 'function', None), 'name', '')
# Architect stays in control if delegating tasks or if Engineer escalated to them
# consult_architect is just Engineer asking for advice - Engineer keeps control
if fn in ['delegate_to_engineer', 'escalate_to_architect']:
is_architect_active = True; break
if is_architect_active: break
current_brain = "architect" if is_architect_active else "engineer"
# 2. Preparación de mensajes y limpieza
clean_input = re.sub(r'^(architect|arquitecto|engineer|ingeniero|@architect|@engineer)[:\s]+', '', user_input, flags=re.IGNORECASE).strip()
system_prompt = self.architect_system_prompt if current_brain == "architect" else self.engineer_system_prompt
tools = self._get_architect_tools() if current_brain == "architect" else self._get_engineer_tools()
model = self.architect_model if current_brain == "architect" else self.engineer_model
key = self.architect_key if current_brain == "architect" else self.engineer_key
current_auth = self.architect_auth if current_brain == "architect" else self.engineer_auth
# Estructura optimizada para Prompt Caching (Solo para Anthropic directo, Vertex tiene reglas distintas)
if "claude" in model.lower() and "vertex" not in model.lower():
messages = [{"role": "system", "content": [{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}]}]
else:
messages = [{"role": "system", "content": system_prompt}]
# Interleaving de historial
last_role = "system"
# Sanitize history if the current target model is not compatible with cache_control
history_to_process = chat_history[-self.max_history:]
if "claude" not in model.lower() or "vertex" in model.lower():
history_to_process = self._sanitize_messages(history_to_process)
for msg in history_to_process:
m = msg if isinstance(msg, dict) else msg.model_dump(exclude_none=True)
role = m.get('role')
if role == last_role and role == 'user':
messages[-1]['content'] += "\n" + (m.get('content') or "")
continue
if role == 'assistant' and m.get('tool_calls') and m.get('content') == "": m['content'] = None
messages.append(m)
last_role = role
if last_role == 'user': messages[-1]['content'] += "\n" + clean_input
else: messages.append({"role": "user", "content": clean_input})
# 3. Bucle de ejecución
iteration = 0
try:
# Set up remote interrupt callback if bridge is provided
if status and hasattr(status, "on_interrupt"):
status.on_interrupt = lambda: setattr(self, "interrupted", True)
while iteration < self.hard_limit_iterations:
iteration += 1
# Check for interruption
if self.interrupted:
raise KeyboardInterrupt
# Soft limit warning - handled inline within update_status
label = "[architect][bold]Architect[/bold][/architect]" if current_brain == "architect" else "[engineer][bold]Engineer[/bold][/engineer]"
if status:
# Notify responder identity for web/remote clients
if getattr(status, "is_web", False) or getattr(status, "is_remote", False):
status.update(f"__RESPONDER__:{current_brain}")
update_status(f"{label} is thinking... (step {iteration})")
streamed_response = False
try:
safe_messages = self._sanitize_messages(messages)
if stream:
response, streamed_response = self._stream_completion(
model=model, messages=safe_messages, tools=tools, auth=current_auth,
status=status, label=label, debug=debug, num_retries=3,
chunk_callback=chunk_callback
)
else:
response = completion(model=model, messages=safe_messages, tools=tools, num_retries=3, **current_auth)
except Exception as e:
if current_brain == "architect":
if status: update_status("[unavailable]Architect unavailable! Falling back to Engineer...")
# Preserve context when falling back - use clean_input directly
current_brain = "engineer"
model = self.engineer_model
tools = self._get_engineer_tools()
key = self.engineer_key
current_auth = self.engineer_auth
# Rebuild messages with Engineer system prompt and original user request
messages = [{"role": "system", "content": self.engineer_system_prompt}]
# Add chat history if exists (excluding system prompt)
if chat_history:
for msg in chat_history[-self.max_history:]:
if msg.get('role') != 'system':
messages.append(msg)
# Add current user request
messages.append({"role": "user", "content": clean_input})
continue
else:
return {"response": f"Error: Both engines failed. {str(e)}", "chat_history": messages[1:], "usage": usage}
if hasattr(response, "usage") and response.usage:
usage["input"] += getattr(response.usage, "prompt_tokens", 0)
usage["output"] += getattr(response.usage, "completion_tokens", 0)
usage["total"] += getattr(response.usage, "total_tokens", 0)
resp_msg = response.choices[0].message
msg_dict = resp_msg.model_dump(exclude_none=True)
if msg_dict.get("tool_calls") and msg_dict.get("content") == "": msg_dict["content"] = None
messages.append(msg_dict)
if debug and resp_msg.content and not streamed_response:
# In CLI debug mode, only print intermediate reasoning if there are tool calls AND it wasn't already streamed.
# If there are no tool calls, this content is the final answer and will be printed by the caller.
if resp_msg.tool_calls:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Markdown(resp_msg.content), title=f"[{current_brain}][bold]{label} Reasoning[/bold][/{current_brain}]", border_style="architect" if current_brain == "architect" else "engineer"))
if status:
try: status.start()
except: pass
if not resp_msg.tool_calls: break
# Track if we need to inject a user message after all tool responses
pending_user_message = None
for tc in resp_msg.tool_calls:
fn, args = tc.function.name, json.loads(tc.function.arguments)
# Validate tool access based on current brain
if fn in ['delegate_to_engineer'] and current_brain != "architect":
obs = f"Error: Tool '{fn}' is only available to the Architect (Architect). You are the Engineer (Engineer). Use 'run_commands' directly to execute configuration."
messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": obs})
continue
if status:
if fn == "delegate_to_engineer": update_status(f"[architect]Architect: [DELEGATING MISSION] {args.get('task','')[:40]}...")
elif fn == "manage_memory_tool": update_status(f"[architect]Architect: [UPDATING MEMORY]")
if debug:
self._print_debug_observation(f"Decision: {fn}", args, status=status)
if fn == "delegate_to_engineer":
obs, eng_usage = self._engineer_loop(args["task"], status=status, debug=debug, chat_history=messages[:-1])
usage["input"] += eng_usage["input"]; usage["output"] += eng_usage["output"]; usage["total"] += eng_usage["total"]
elif fn == "consult_architect":
if status: update_status("[architect]Engineer consulting Architect...")
try:
# Consultation only - Engineer stays in control
claude_resp = completion(
model=self.architect_model,
messages=[
{"role": "system", "content": self.architect_system_prompt},
{"role": "user", "content": f"The Engineer needs your strategic advice.\n\nTECHNICAL SUMMARY: {args['technical_summary']}\n\nQUESTION: {args['question']}\n\nProvide strategic guidance. The Engineer will continue handling the user."}
],
api_key=self.architect_key,
num_retries=3
)
obs = claude_resp.choices[0].message.content
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Markdown(obs), title="[architect]Architect Consultation[/architect]", border_style="architect"))
if status:
try: status.start()
except: pass
except Exception as e:
if status: update_status("[unavailable]Architect unavailable! Engineer continuing alone...")
obs = f"Architect unavailable ({str(e)}). Proceeding with your best technical judgment."
elif fn == "escalate_to_architect":
if status: update_status("[architect]Transferring control to Architect...")
# Full escalation - Architect takes over
current_brain = "architect"
model = self.architect_model
tools = self._get_architect_tools()
key = self.architect_key
current_auth = self.architect_auth
messages[0] = {"role": "system", "content": self.architect_system_prompt}
# Prepare handover context to inject AFTER all tool responses
handover_msg = f"HANDOVER FROM EXECUTION ENGINE\n\nReason: {args['reason']}\n\nContext: {args['context']}\n\nYou are now in control of this conversation."
pending_user_message = handover_msg
obs = "Control transferred to Architect. Handover context will be provided."
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Text(handover_msg), title="[architect]Escalation to Architect[/architect]", border_style="architect"))
if status:
try: status.start()
except: pass
elif fn == "return_to_engineer":
if status: update_status("[engineer]Transferring control back to Engineer...")
# Architect returns control to Engineer
current_brain = "engineer"
model = self.engineer_model
tools = self._get_engineer_tools()
key = self.engineer_key
current_auth = self.engineer_auth
messages[0] = {"role": "system", "content": self.engineer_system_prompt}
# Prepare handover context to inject AFTER all tool responses
handover_msg = f"HANDOVER FROM ARCHITECT\n\nSummary: {args['summary']}\n\nYou are now back in control. Continue handling the user's requests."
pending_user_message = handover_msg
obs = "Control returned to Engineer. Handover summary will be provided."
if debug:
if status:
try: status.stop()
except: pass
self.console.print(Panel(Text(handover_msg), title="[engineer]Return to Engineer[/engineer]", border_style="engineer"))
if status:
try: status.start()
except: pass
elif fn == "list_nodes": obs = self.list_nodes_tool(**args)
elif fn == "run_commands": obs = self.run_commands_tool(**args, status=status)
elif fn == "get_node_info": obs = self.get_node_info_tool(**args)
elif fn == "manage_memory_tool": obs = self.manage_memory_tool(**args)
elif fn.startswith("mcp_"):
obs = run_ai_async(self.mcp_manager.call_tool(fn, args)).result(timeout=60)
elif fn in self.external_tool_handlers: obs = self.external_tool_handlers[fn](self, **args)
else: obs = f"Error: {fn} unknown."
if debug and fn not in ["delegate_to_engineer", "consult_architect", "escalate_to_architect", "return_to_engineer"]:
self._print_debug_observation(f"Observation: {fn}", obs, status=status)
# Ensure observation is a string and truncated for the LLM
obs_str = obs if isinstance(obs, str) else json.dumps(obs)
messages.append({"tool_call_id": tc.id, "role": "tool", "name": fn, "content": self._truncate(obs_str)})
# Inject pending user message AFTER all tool responses are added
if pending_user_message:
messages.append({"role": "user", "content": pending_user_message})
if iteration >= self.hard_limit_iterations:
self.console.print(f"[error]⛔ Agent reached hard limit ({self.hard_limit_iterations} steps). Forcing stop to prevent infinite loop.[/error]")
# Only inject user message if we're not in the middle of tool calls
last_msg = messages[-1] if messages else {}
if last_msg.get("role") != "assistant" or not last_msg.get("tool_calls"):
messages.append({"role": "user", "content": "Hard iteration limit reached. Please provide a summary of your findings so far."})
try:
safe_messages = self._sanitize_messages(messages)
response = completion(model=model, messages=safe_messages, tools=[], **current_auth)
resp_msg = response.choices[0].message
messages.append(resp_msg.model_dump(exclude_none=True))
except Exception as e:
if status:
update_status(f"[error]Error fetching summary: {e}[/error]")
printer.warning(f"Failed to fetch final summary from LLM: {e}")
except KeyboardInterrupt:
if status: status.update("[error]Interrupted! Closing pending tasks...")
last_msg = messages[-1]
if last_msg.get("tool_calls"):
for tc in last_msg["tool_calls"]:
messages.append({"tool_call_id": tc.get("id"), "role": "tool", "name": tc.get("function", {}).get("name"), "content": "Operation cancelled by user."})
# Use a fresh list for the summary call to avoid history corruption
summary_messages = list(messages)
summary_messages.append({"role": "user", "content": "USER INTERRUPTED. Briefly summarize what you were doing and stop."})
try:
safe_messages = self._sanitize_messages(summary_messages)
# Use tools=None to force a text summary during interruption
response = completion(model=model, messages=safe_messages, tools=None, **current_auth)
resp_msg = response.choices[0].message
messages.append(resp_msg.model_dump(exclude_none=True))
# IMPORTANT: Manually trigger callback for the summary so Web UI sees it
if chunk_callback and resp_msg.content:
chunk_callback(resp_msg.content)
except Exception:
error_msg = "Operation interrupted by user. Summary unavailable."
messages.append({"role": "assistant", "content": error_msg})
if chunk_callback:
chunk_callback(error_msg)
finally:
# Auto-save session
self.save_session(messages, model=model)
return {
"response": messages[-1].get("content"),
"chat_history": messages[1:],
"app_related": True,
"usage": usage,
"responder": current_brain, # "architect" or "engineer"
"streamed": streamed_response
}
@MethodHook
async def aask_copilot(self, terminal_buffer, user_question, node_info=None, chunk_callback=None):
import json
import re
from litellm import acompletion
import asyncio
import warnings
import aiohttp
# Suppress unawaited coroutine warnings from LiteLLM's internal streaming logic during sudden cancellation
warnings.filterwarnings("ignore", message="coroutine '.*async_streaming.*' was never awaited", category=RuntimeWarning)
node_info = node_info or {}
os_info = node_info.get("os", "unknown")
node_name = node_info.get("name", "unknown")
persona = node_info.get("persona", "engineer")
memories = node_info.get("memories", [])
vendor_reference = ""
if os_info and os_info != "unknown":
try:
os_filename = os_info.lower().replace(" ", "_")
ref_path = os.path.join(self.config.defaultdir, "ai_references", f"{os_filename}.md")
if os.path.exists(ref_path):
with open(ref_path, "r") as f:
vendor_reference = f.read().strip()
except Exception:
pass
if persona == "architect":
system_prompt = f"""Role: NETWORK ARCHITECT. You act as a senior strategic advisor during a live SSH session.
Rules:
1. MANDATORY: You MUST respond in the same language used by the user in their question.
2. Answer the user's question directly and EXCLUSIVELY based on the Terminal Context.
3. NO HALLUCINATIONS. The Terminal Context is a live buffer. If it contains only a shell prompt (like 'iol#' or 'admin@vrouter>') and no command output, it means YOU DON'T HAVE DATA. In this case, YOU MUST NOT invent any information.
4. Focus on the "why" and "how". Analyze topologies, design patterns, and validate configurations.
5. Do NOT provide commands to execute unless specifically requested. Instead, explain the consequences and best practices.
6. Keep your guide concise and authoritative.
7. You MUST output your response in the following strict format:
<guide>
Your brief tactical guide in markdown.
</guide>
<commands>
</commands>
<risk>
low
</risk>
8. Risk level is usually "low" for read-only/no commands.
Terminal Context:
{terminal_buffer}
Device OS: {os_info}
Node: {node_name}"""
else:
system_prompt = f"""Role: TERMINAL COPILOT. You assist a network engineer during a live SSH session.
Rules:
1. MANDATORY: You MUST respond in the same language used by the user in their question.
2. EXTREMELY IMPORTANT: Answer EXCLUSIVELY based on the provided Terminal Context.
3. NO HALLUCINATIONS. The Terminal Context is a live buffer. If it contains only a shell prompt (like 'iol#' or 'admin@vrouter>') and no command output, it means YOU DON'T HAVE DATA. In this case, YOU MUST NOT invent any information. Instead, explicitly state that you don't see the data and offer the correct CLI commands to retrieve it.
4. If the user asks you to analyze, parse, or extract data from the Terminal Context, DO IT directly in the <guide> section (you can use markdown tables or lists). Do NOT just give them a command to do it themselves.
5. If the user wants to execute an action, provide the required CLI commands inside a <commands> block, one command per line. If no commands are needed, leave it empty or omit the block.
6. ULTRA-CONCISE. Keep your guide to the point.
7. You MUST output your response in the following strict format:
<guide>
Your brief tactical guide in markdown. 3-4 sentences max.
</guide>
<commands>
command 1
command 2
</commands>
<risk>
low, high, or destructive
</risk>
8. Risk level: "low" for read-only/no commands, "high" for config changes, "destructive" for potentially dangerous ops.
Terminal Context:
{terminal_buffer}
Device OS: {os_info}
Node: {node_name}"""
if vendor_reference:
system_prompt += f"\n\nVendor Command Reference:\n{vendor_reference}"
if memories:
system_prompt += "\n\nSession Memory (Important Facts):\n"
for m in memories:
system_prompt += f"- {m}\n"
# Fetch MCP tools for the current OS
mcp_tools = []
try:
mcp_tools = await self.mcp_manager.get_tools_for_llm(os_filter=os_info)
except Exception:
pass
if mcp_tools:
system_prompt += f"\n\nAvailable MCP Tools: {', '.join([t['function']['name'] for t in mcp_tools])}"
system_prompt += "\nUse these tools to validate syntax or find exact commands if needed before providing the final guide."
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_question}
]
iteration = 0
max_iterations = 5 # Allow up to 5 iterations for tool usage
# Use models based on persona
current_model = self.architect_model if persona == "architect" else self.engineer_model
current_key = self.architect_key if persona == "architect" else self.engineer_key
current_auth = self.architect_auth if persona == "architect" else self.engineer_auth
try:
while iteration < max_iterations:
iteration += 1
response = await acompletion(
model=current_model,
messages=messages,
tools=mcp_tools if mcp_tools else None,
stream=True,
**current_auth
)
full_content = ""
streamed_guide = ""
tool_calls = []
async for chunk in response:
delta = chunk.choices[0].delta
# Accumulate tool calls
if hasattr(delta, 'tool_calls') and delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx >= len(tool_calls):
tool_calls.append({"id": tc.id, "type": "function", "function": {"name": tc.function.name or "", "arguments": tc.function.arguments or ""}})
else:
if tc.id: tool_calls[idx]["id"] = tc.id
if tc.function.name: tool_calls[idx]["function"]["name"] = tc.function.name
if tc.function.arguments: tool_calls[idx]["function"]["arguments"] += tc.function.arguments
if hasattr(delta, 'content') and delta.content:
full_content += delta.content
if chunk_callback and not tool_calls: # Only stream if not using tools
start_idx = full_content.find("<guide>")
if start_idx != -1:
after_start = full_content[start_idx + 7:]
end_idx = after_start.find("</guide>")
if end_idx != -1:
current_guide = after_start[:end_idx]
else:
current_guide = after_start
if current_guide.endswith("<"): current_guide = current_guide[:-1]
elif current_guide.endswith("</"): current_guide = current_guide[:-2]
elif current_guide.endswith("</g"): current_guide = current_guide[:-3]
elif current_guide.endswith("</gu"): current_guide = current_guide[:-4]
elif current_guide.endswith("</gui"): current_guide = current_guide[:-5]
elif current_guide.endswith("</guid"): current_guide = current_guide[:-6]
elif current_guide.endswith("</guide"): current_guide = current_guide[:-7]
new_text = current_guide[len(streamed_guide):]
if new_text:
chunk_callback(new_text)
streamed_guide += new_text
if not tool_calls:
break
# Execute tool calls
messages.append({"role": "assistant", "content": full_content or None, "tool_calls": tool_calls})
for tc in tool_calls:
fn = tc["function"]["name"]
args = json.loads(tc["function"]["arguments"])
if "mcp_" in fn:
try:
obs = await asyncio.wait_for(self.mcp_manager.call_tool(fn, args), timeout=30.0)
except Exception as e:
obs = f"Error calling MCP tool: {e}"
else:
obs = f"Error: Tool {fn} not allowed in Copilot."
messages.append({"tool_call_id": tc["id"], "role": "tool", "name": fn, "content": self._truncate(str(obs))})
# If we hit the limit and it was still using tools, force a final answer
if tool_calls and iteration >= max_iterations:
messages.append({"role": "user", "content": "Tool limit reached. Provide your final tactical guide now based on the findings."})
response = await acompletion(
model=self.engineer_model,
messages=messages,
tools=None,
stream=True,
**self.engineer_auth
)
full_content = ""
streamed_guide = ""
async for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
full_content += delta.content
if chunk_callback:
start_idx = full_content.find("<guide>")
if start_idx != -1:
after_start = full_content[start_idx + 7:]
end_idx = after_start.find("</guide>")
if end_idx != -1:
current_guide = after_start[:end_idx]
else:
current_guide = after_start
if current_guide.endswith("<"): current_guide = current_guide[:-1]
elif current_guide.endswith("</"): current_guide = current_guide[:-2]
elif current_guide.endswith("</g"): current_guide = current_guide[:-3]
elif current_guide.endswith("</gu"): current_guide = current_guide[:-4]
elif current_guide.endswith("</gui"): current_guide = current_guide[:-5]
elif current_guide.endswith("</guid"): current_guide = current_guide[:-6]
elif current_guide.endswith("</guide"): current_guide = current_guide[:-7]
new_text = current_guide[len(streamed_guide):]
if new_text:
chunk_callback(new_text)
streamed_guide += new_text
guide = ""
commands = []
risk_level = "low"
guide_match = re.search(r"<guide>(.*?)</guide>", full_content, re.DOTALL)
if guide_match:
guide = guide_match.group(1).strip()
cmd_match = re.search(r"<commands>(.*?)</commands>", full_content, re.DOTALL)
if cmd_match:
cmds_raw = cmd_match.group(1).strip()
if cmds_raw:
commands = [c.strip() for c in cmds_raw.split('\n') if c.strip()]
risk_match = re.search(r"<risk>(.*?)</risk>", full_content, re.DOTALL)
if risk_match:
risk_level = risk_match.group(1).strip().lower()
if not guide and full_content and not ("<guide>" in full_content):
guide = full_content.strip()
return {
"commands": commands,
"guide": guide,
"risk_level": risk_level,
"error": None
}
except asyncio.CancelledError:
# Client cancelled the request via gRPC or local interrupt
if 'response' in locals():
try:
if hasattr(response, 'aclose'):
# Fire and forget the close to avoid blocking the cancel
asyncio.create_task(response.aclose())
elif hasattr(response, 'close'):
response.close()
except Exception:
pass
return None
except Exception as e:
return {
"commands": [],
"guide": "",
"risk_level": "low",
"error": str(e)
}
@MethodHook
def confirm(self, user_input): return True
PLAYBOOK_BUILDER_SYSTEM_PROMPT = """
You are a Connpy Playbook Builder Agent, a specialist in creating structured Connpy automation playbooks in YAML format.
Your primary mission is to help the user build, refine, and validate playbooks.
You MUST follow the Connpy canonical playbook format strictly:
The playbook MUST always use the `tasks[]` array structure as the root key, where each task is sequential and independent.
Connpy YAML Playbook Canonical Schema:
---
tasks:
- name: "Task Description"
action: 'run' # Can be 'run' or 'test'. Mandatory.
nodes: # List of nodes filter or regular expressions to work on. Mandatory. Can be a string or array of strings. Supports regex (e.g. 'router.*@office' to match all routers in the 'office' folder).
- 'router1@office'
- 'router.*@office' # Regex filters are fully supported to match multiple nodes dynamically.
- '@aws'
commands: # List of CLI commands to execute. Mandatory.
- 'show version'
variables: # Key-value pairs for variables replacement in commands and expected. Optional.
__global__: # Global variables fallback. Optional.
key: value
node_name@folder: # Node-specific variables. Optional.
key: value
output: stdout # Mandatory. Output configuration. Choices: 'stdout', 'null', or a folder path like '/path/to/folder'.
options: # Execution options. Optional.
prompt: 'regex_prompt' # Optional prompt to expect.
parallel: 10 # Optional number of parallel threads. Default 10.
timeout: 20 # Optional execution timeout in seconds. Default 20.
- name: "Verification Task"
action: 'test'
nodes:
- 'router1@office'
commands:
- 'ping 10.100.100.1'
expected: '!' # Expected text pattern to search in output. Mandatory ONLY for 'test' action.
Connpy Variable Templating & Usage:
- Variables defined under the `variables` key (either globally under `__global__` or for specific nodes) are used in commands or expected output by surrounding the variable name with single curly braces: `{variable_name}`.
- Example: If you define a variable `ip` with a value of `10.100.100.1`, you use it in commands as `'ping {ip}'`.
- Recommendation (Important): Variables are not limited to simple words or values. You can define entire CLI commands as variables to abstract vendor-specific syntax! This is highly recommended when executing the same logical operation across different operating systems (OS) or vendors.
- Example: You can define `show_interface_cmd` under a specific node's variables to be `'show ip interface brief'` for Cisco, and `'show interfaces terse'` for Juniper, and then write a single generic command under `commands`:
`- '{show_interface_cmd}'`
Guidelines:
1. When the user requests a playbook, you should guide them and output the YAML.
2. IMPORTANT: You have access to the `list_nodes` tool. Proactively use it to inspect the user's real inventory. This allows you to discover correct node names, folders, or device tags, and construct precise regex filters for the `nodes` field based on real assets.
3. IMPORTANT: Before presenting the playbook, you MUST call the `validate_playbook` tool with the YAML to let the backend check for syntax and schema correctness.
4. If `validate_playbook` returns errors, fix them in your YAML and validate again before responding to the user.
5. When the playbook is complete, validated, and the user approves it, you MUST call the `return_playbook` tool to return the final YAML.
6. All text responses must be in the same language the user uses in their prompt.
"""
PLAYBOOK_BUILDER_TOOLS = [
{
"type": "function",
"function": {
"name": "list_nodes",
"description": "[Universal Platform] Lists available nodes in the inventory. Use this to discover device names, folders, or operating systems to build proper regex filters.",
"parameters": {
"type": "OBJECT",
"properties": {
"filter_pattern": {
"type": "STRING",
"description": "Regex or pattern to filter nodes (e.g. '.*', 'border.*', '@office')."
}
}
}
}
},
{
"type": "function",
"function": {
"name": "validate_playbook",
"description": "Validates the Connpy YAML playbook structure, syntax, and schema correctness with the backend.",
"parameters": {
"type": "OBJECT",
"properties": {
"playbook_yaml": {
"type": "STRING",
"description": "The YAML content of the playbook to validate."
}
},
"required": ["playbook_yaml"]
}
}
},
{
"type": "function",
"function": {
"name": "return_playbook",
"description": "Returns the final validated YAML playbook to the calling application when the user is satisfied.",
"parameters": {
"type": "OBJECT",
"properties": {
"playbook_yaml": {
"type": "STRING",
"description": "The final YAML content of the playbook."
}
},
"required": ["playbook_yaml"]
}
}
}
]
class PlaybookBuilderAgent:
"""Specialized AI agent for building, validating, and generating Connpy YAML playbooks."""
def __init__(self, config, console=None, confirm_handler=None, trust=False, **kwargs):
self.config = config
self.console = console or printer.console
self.interrupted = False
# Load AI configuration
if hasattr(self.config, "get_effective_setting"):
aiconfig = self.config.get_effective_setting("ai", {})
else:
aiconfig = self.config.config.get("ai", {}) if hasattr(self.config, "config") else {}
# Default model for technical tasks
self.model = kwargs.get("engineer_model") or aiconfig.get("engineer_model") or "gemini/gemini-3.1-flash-lite"
self.key = kwargs.get("engineer_api_key") or aiconfig.get("engineer_api_key")
self.auth = kwargs.get("engineer_auth") or aiconfig.get("engineer_auth") or {}
if self.key and "api_key" not in self.auth:
self.auth = self.auth.copy()
self.auth["api_key"] = self.key
def validate_playbook(self, playbook_yaml: str) -> dict:
"""Sintactical and schema validation of Connpy Playbook YAML."""
import yaml
try:
# 1. Parse YAML
data = yaml.load(playbook_yaml, Loader=yaml.FullLoader)
except Exception as e:
return {"valid": False, "error": f"YAML Syntax Error: {e}"}
# 2. Check structure
if not isinstance(data, dict):
return {"valid": False, "error": "Playbook must be a YAML dictionary."}
if "tasks" not in data:
return {"valid": False, "error": "Playbook missing mandatory root 'tasks' key."}
tasks = data["tasks"]
if not isinstance(tasks, list):
return {"valid": False, "error": "'tasks' must be a list of tasks."}
# 3. Check individual tasks
for idx, task in enumerate(tasks):
if not isinstance(task, dict):
return {"valid": False, "error": f"Task index {idx} must be a dictionary."}
name = task.get("name", f"Task {idx}")
# Mandatory fields
mandatory = ["name", "action", "nodes", "commands", "output"]
missing = [field for field in mandatory if field not in task]
if missing:
return {"valid": False, "error": f"Task '{name}' (index {idx}) is missing mandatory fields: {missing}"}
# Validate nodes field type (supports string regexes or array of string regexes)
nodes = task["nodes"]
if not isinstance(nodes, (str, list)):
return {"valid": False, "error": f"Task '{name}' (index {idx}) 'nodes' must be a string (regex) or a list of strings (regexes)."}
if isinstance(nodes, list):
for n_idx, node_item in enumerate(nodes):
if not isinstance(node_item, str):
return {"valid": False, "error": f"Task '{name}' (index {idx}) 'nodes' list contains a non-string value at index {n_idx}: {node_item}"}
action = task["action"]
if action not in ["run", "test"]:
return {"valid": False, "error": f"Task '{name}' (index {idx}) has invalid action '{action}'. Choices are: 'run', 'test'."}
if action == "test" and "expected" not in task:
return {"valid": False, "error": f"Task '{name}' (index {idx}) has action 'test' but is missing the mandatory 'expected' key."}
output = task["output"]
if output not in [None, "stdout"] and not output.startswith("/"):
return {"valid": False, "error": f"Task '{name}' (index {idx}) output '{output}' is invalid. Must be 'stdout', 'null' or an absolute path."}
return {"valid": True, "message": "Playbook schema and syntax is valid."}
def ask(self, user_input, chat_history=None, status=None, debug=False, chunk_callback=None):
"""Standard conversation step with tool loop for PlaybookBuilderAgent."""
if chat_history is None:
chat_history = []
# System prompt and tool definition
system_prompt = PLAYBOOK_BUILDER_SYSTEM_PROMPT
tools = PLAYBOOK_BUILDER_TOOLS
messages = [{"role": "system", "content": system_prompt}]
for msg in chat_history:
m = msg if isinstance(msg, dict) else msg.copy()
if m.get('role') == 'assistant' and m.get('tool_calls') and m.get('content') == "":
m['content'] = None
messages.append(m)
messages.append({"role": "user", "content": user_input})
final_playbook_yaml = None
iteration = 0
max_iterations = 10
while iteration < max_iterations:
iteration += 1
if status:
status.update(f"Playbook Agent is thinking... (step {iteration})")
# Call LiteLLM completion
from connpy.ai import completion
try:
response = completion(
model=self.model,
messages=messages,
tools=tools,
num_retries=3,
**self.auth
)
except Exception as e:
return {"response": f"Playbook Agent failed: {str(e)}", "chat_history": messages[1:]}
resp_msg = response.choices[0].message
msg_dict = resp_msg.model_dump(exclude_none=True)
if msg_dict.get("tool_calls") and msg_dict.get("content") == "":
msg_dict["content"] = None
messages.append(msg_dict)
# If the model sends content, stream or yield it
if resp_msg.content:
if chunk_callback:
chunk_callback(resp_msg.content)
elif not resp_msg.tool_calls:
# In direct non-streaming output, print markdown
self.console.print(Markdown(resp_msg.content))
if not resp_msg.tool_calls:
break
for tc in resp_msg.tool_calls:
fn = tc.function.name
args = json.loads(tc.function.arguments)
if fn == "list_nodes":
filter_pattern = args.get("filter_pattern", ".*")
try:
matched_names = self.config._getallnodes(filter_pattern)
if not matched_names:
obs = "No nodes found matching the filter."
else:
if len(matched_names) <= 5:
matched_data = self.config.getitems(matched_names, extract=True)
res = {}
for name, data in matched_data.items():
os_tag = "unknown"
if isinstance(data, dict):
ts = data.get("tags")
if isinstance(ts, dict): os_tag = ts.get("os", "unknown")
res[name] = {"os": os_tag}
obs = json.dumps(res)
else:
obs = json.dumps({
"matched_count": len(matched_names),
"message": "Too many nodes matched. Showing names only.",
"node_names": matched_names
})
except Exception as e:
obs = f"Error listing nodes: {e}"
messages.append({
"tool_call_id": tc.id,
"role": "tool",
"name": fn,
"content": obs
})
elif fn == "validate_playbook":
playbook_yaml = args.get("playbook_yaml", "")
validation_res = self.validate_playbook(playbook_yaml)
messages.append({
"tool_call_id": tc.id,
"role": "tool",
"name": fn,
"content": json.dumps(validation_res)
})
elif fn == "return_playbook":
final_playbook_yaml = args.get("playbook_yaml", "")
messages.append({
"tool_call_id": tc.id,
"role": "tool",
"name": fn,
"content": json.dumps({"success": True, "message": "Playbook returned successfully."})
})
# If return_playbook was called, we can terminate early
if final_playbook_yaml is not None:
break
return {
"response": resp_msg.content or "",
"chat_history": messages[1:],
"playbook_yaml": final_playbook_yaml
}