diff --git a/connpy/core.py b/connpy/core.py index 5590b53..84123f3 100755 --- a/connpy/core.py +++ b/connpy/core.py @@ -440,9 +440,16 @@ class node: if clean_data: # Track command boundaries when user hits Enter if hasattr(self, 'mylog') and (b'\r' in clean_data or b'\n' in clean_data): - self.cmd_byte_positions.append((self.mylog.tell(), None)) + # Introduce a tiny 20ms delay to allow late-arriving tab-completion bytes + # to be written to mylog before finalizing the boundary marker. + async def delayed_marker(): + await asyncio.sleep(0.02) + if hasattr(self, 'mylog'): + self.cmd_byte_positions.append((self.mylog.tell(), None)) + asyncio.create_task(delayed_marker()) - try: os.write(child_fd, clean_data) + try: + os.write(child_fd, clean_data) except OSError: break self.lastinput = time() diff --git a/connpy/services/ai_service.py b/connpy/services/ai_service.py index 19a5889..100115f 100644 --- a/connpy/services/ai_service.py +++ b/connpy/services/ai_service.py @@ -6,6 +6,37 @@ from connpy.utils import log_cleaner class AIService(BaseService): """Business logic for interacting with AI agents and LLM configurations.""" + def _clean_cisco_scrolling(self, text: str) -> str: + """Resolves horizontal scrolling artifacts (backspaces, \r, ANSI) by merging overlapping segments.""" + def merge_overlapping(s1, s2): + s2_clean = s2.lstrip(' $') + max_overlap = min(len(s1), len(s2_clean)) + for i in range(max_overlap, 0, -1): + if s1[-i:] == s2_clean[:i]: + return s1 + s2_clean[i:] + return s1 + s2_clean + + scroll_re = re.compile(r'(\x08{5,}\s*\$?|\$\r|\x1b\[\d+[GD]\s*\$?)') + parts = scroll_re.split(text) + merged = "" + + for part in parts: + if scroll_re.match(part): + continue + + cleaned = log_cleaner(part) + if not merged: + merged = cleaned + else: + merged_lines = merged.split('\n') + cleaned_lines = cleaned.split('\n') + + merged_lines[-1] = merge_overlapping(merged_lines[-1], cleaned_lines[0]) + merged_lines.extend(cleaned_lines[1:]) + merged = "\n".join(merged_lines) + + return merged + def build_context_blocks(self, raw_bytes: bytes, cmd_byte_positions: list, node_info: dict, last_line: str = "") -> list: """Identifies command blocks in the terminal history.""" blocks = [] @@ -28,27 +59,64 @@ class AIService(BaseService): if known_cmd: prev_chunk = raw_bytes[prev_pos:pos] - prev_cleaned = log_cleaner(prev_chunk.decode(errors='replace')) + prev_cleaned = self._clean_cisco_scrolling(prev_chunk.decode(errors='replace')) prev_lines = [l for l in prev_cleaned.split('\n') if l.strip()] prompt_text = prev_lines[-1].strip() if prev_lines else "" preview = f"{prompt_text}{known_cmd}" if prompt_text else known_cmd - parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]}) + + if len(preview) > 80: + preview = preview[:77] + "..." + parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview}) else: chunk = raw_bytes[prev_pos:pos] - cleaned = log_cleaner(chunk.decode(errors='replace')) - lines = [l for l in cleaned.split('\n') if l.strip()] - preview = lines[-1].strip() if lines else "" - if preview: - match = prompt_re.search(preview) - if match: - cmd_text = preview[match.end():].strip() - if cmd_text: - parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": preview[:80]}) - else: - parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""}) - else: - parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) + cleaned = self._clean_cisco_scrolling(chunk.decode(errors='replace')) + lines = [l for l in cleaned.split('\n') if l.strip()] + + found_in_pass1 = False + if lines: + # Search backwards through the last few lines for the prompt + for idx in range(len(lines) - 1, max(-1, len(lines) - 10), -1): + match = prompt_re.search(lines[idx]) + if match: + ptxt = match.group(0).strip() + cmd_first_line = lines[idx][match.end():].strip() + cmd_rest = [l.strip() for l in lines[idx+1:]] + cmd_text = " ".join([cmd_first_line] + cmd_rest).strip() + + if cmd_text: + pv = f"{ptxt} {cmd_text}".strip() + if len(pv) > 80: + pv = pv[:77] + "..." + parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv}) + else: + parsed_positions.append({"pos": pos, "type": "EMPTY_PROMPT", "preview": ""}) + found_in_pass1 = True + break + + if not found_in_pass1: + # Fallback: The prompt might have been isolated in the previous chunk + # due to asynchronous network delays splitting the output exactly at the newline. + if prev_pos > 0: + # Fetch the very last chunk that we just processed + prev_prev_pos = cmd_byte_positions[i-2][0] if i >= 2 else 0 + prev_chunk_text = self._clean_cisco_scrolling(raw_bytes[prev_prev_pos:prev_pos].decode(errors='replace')) + prev_lines_text = [l for l in prev_chunk_text.split('\n') if l.strip()] + + if prev_lines_text: + prev_match = prompt_re.search(prev_lines_text[-1]) + if prev_match: + ptxt = prev_match.group(0).strip() + cmd_text = " ".join([l.strip() for l in lines]).strip() + if cmd_text: + pv = f"{ptxt} {cmd_text}".strip() + if len(pv) > 80: + pv = pv[:77] + "..." + parsed_positions.append({"pos": pos, "type": "VALID_CMD", "preview": pv}) + found_in_pass1 = True + + if not found_in_pass1: + parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) else: parsed_positions.append({"pos": pos, "type": "SCROLLING", "preview": ""}) diff --git a/connpy/tests/test_ai_copilot.py b/connpy/tests/test_ai_copilot.py index 9c2c874..66fecae 100644 --- a/connpy/tests/test_ai_copilot.py +++ b/connpy/tests/test_ai_copilot.py @@ -158,3 +158,38 @@ def test_ingress_task_interception(): assert called_copilot asyncio.run(run_test()) + +def test_build_context_blocks_horizontal_scrolling(): + from connpy.services.ai_service import AIService + svc = AIService(None) + + node_info = {"prompt": "RP/0/RP0/CPU0:xrd#"} + part1 = 'RP/0/RP0/CPU0:xrd#s show interfaces * | inc "rate|is up|escr|test1|test2|test3|test4|test5|teest8|test7|t$' + part2 = '|escr|test1|test2|test3|test4|test5|teest8|test7|te s998"show interfaces * | inc "rate|is up|escr|test1|test2|test3|test4|test5|teest8|test7|$' + + # Test with \r (classic IOS) + raw_bytes = (part1 + '\r' + part2).encode() + cmd_byte_positions = [(0, None), (len(raw_bytes), None)] + + blocks = svc.build_context_blocks(raw_bytes, cmd_byte_positions, node_info) + assert len(blocks) >= 1 + start, end, preview = blocks[0] + assert "RP/0/RP0/CPU0:xrd# s show interfaces * | inc" in preview + +def test_build_context_blocks_horizontal_scrolling_ansi(): + """Test with CSI cursor repositioning (\\x1B[1G) instead of raw \\r, as used by Cisco IOS XR.""" + from connpy.services.ai_service import AIService + svc = AIService(None) + + node_info = {"prompt": "RP/0/RP0/CPU0:xrd#"} + part1 = 'RP/0/RP0/CPU0:xrd#s show interfaces * | inc "rate|is up|escr|test1|test2|test3|test4|test5|teest8|test7|t' + part2 = '$|escr|test1|test2|test3|test4|test5|teest8|test7|te s998"show interfaces * | inc "rate|is up|escr|test1|test2|test3|test4|test5|teest8|test7|$' + + # Test with \x1B[1G (CSI Cursor Horizontal Absolute - IOS XR) + raw_bytes = (part1 + '\x1b[1G' + part2).encode() + cmd_byte_positions = [(0, None), (len(raw_bytes), None)] + + blocks = svc.build_context_blocks(raw_bytes, cmd_byte_positions, node_info) + assert len(blocks) >= 1 + start, end, preview = blocks[0] + assert "RP/0/RP0/CPU0:xrd# s show interfaces * | inc" in preview