Module connpy.cli.run_handler

Classes

class RunHandler (app)
Expand source code
class RunHandler:
    def __init__(self, app):
        self.app = app
        self.print_lock = threading.Lock()

    def dispatch(self, args):
        if len(args.data) > 1:
            args.action = "noderun"
        actions = {"noderun": self.node_run, "generate": self.yaml_generate, "run": self.yaml_run}
        return actions.get(args.action)(args)

    def node_run(self, args):
        nodes_filter = args.data[0]
        
        # Resolve and filter nodes through context-aware list_nodes
        try:
            matched_nodes = self.app.services.nodes.list_nodes(nodes_filter)
        except Exception:
            matched_nodes = []
            
        if not matched_nodes:
            printer.error(f"No nodes found matching filter: {nodes_filter}")
            sys.exit(2)
            
        commands = [" ".join(args.data[1:])]

        try:
            header_printed = False

            if hasattr(args, 'test_expected') and args.test_expected:
                # Mode: Test
                def _on_node_complete(unique, node_output, node_status, node_result):
                    nonlocal header_printed
                    with self.print_lock:
                        if not header_printed:
                            printer.console.print(Rule("OUTPUT", style="header"))
                            header_printed = True
                    printer.test_panel(unique, node_output, node_status, node_result)

                results = self.app.services.execution.test_commands(
                    nodes_filter=matched_nodes,
                    commands=commands,
                    expected=args.test_expected,
                    on_node_complete=_on_node_complete
                )
                printer.test_summary(results)
            else:
                # Mode: Normal Run
                def _on_node_complete(unique, node_output, node_status):
                    nonlocal header_printed
                    with self.print_lock:
                        if not header_printed:
                            printer.console.print(Rule("OUTPUT", style="header"))
                            header_printed = True
                    printer.node_panel(unique, node_output, node_status)

                results = self.app.services.execution.run_commands(
                    nodes_filter=matched_nodes,
                    commands=commands,
                    on_node_complete=_on_node_complete
                )
                printer.run_summary(results)

        except ConnpyError as e:
            printer.error(str(e))
            sys.exit(1)

    def yaml_generate(self, args):
        if os.path.exists(args.data[0]):
            printer.error(f"File '{args.data[0]}' already exists.")
            sys.exit(14)
        else:
            with open(args.data[0], "w") as file:
                file.write(get_instructions("generate"))
            printer.success(f"File {args.data[0]} generated successfully")
            sys.exit()

    def yaml_run(self, args):
        path = args.data[0]
        try:
            with open(path, "r") as f:
                playbook = yaml.load(f, Loader=yaml.FullLoader)

            for task in playbook.get("tasks", []):
                self.cli_run(task)

        except Exception as e:
            printer.error(f"Failed to run playbook {path}: {e}")
            sys.exit(10)

    def cli_run(self, script):
        name = script.get("name", "Task")
        try:
            action = script["action"]
            nodelist = script["nodes"]
            commands = script["commands"]
            variables = script.get("variables")
            output_cfg = script["output"]
            options = script.get("options", {})
        except KeyError as e:
            printer.error(f"[{name}] '{e.args[0]}' is mandatory in script")
            sys.exit(11)

        stdout = (output_cfg == "stdout")
        folder = output_cfg if output_cfg not in [None, "stdout"] else None
        prompt = options.get("prompt")

        # Resolve and filter nodes through context-aware list_nodes
        try:
            if isinstance(nodelist, str):
                resolved_nodes = self.app.services.nodes.list_nodes(nodelist)
            elif isinstance(nodelist, list):
                resolved_nodes = []
                for item in nodelist:
                    matches = self.app.services.nodes.list_nodes(item)
                    for m in matches:
                        if m not in resolved_nodes:
                            resolved_nodes.append(m)
            else:
                resolved_nodes = []
        except Exception:
            resolved_nodes = []

        if not resolved_nodes:
            printer.error(f"[{name}] No nodes found matching filter: {nodelist}")
            sys.exit(11)

        nodelist = resolved_nodes

        try:
            header_printed = False
            if action == "run":
                # If stdout is true, we stream results as they arrive
                def _on_run_complete(unique, node_output, node_status):
                    nonlocal header_printed
                    if stdout:
                        with self.print_lock:
                            if not header_printed:
                                printer.console.print(Rule(name.upper(), style="header"))
                                header_printed = True
                        printer.node_panel(unique, node_output, node_status)

                results = self.app.services.execution.run_commands(
                    nodes_filter=nodelist,
                    commands=commands,
                    variables=variables,
                    parallel=options.get("parallel", 10),
                    timeout=options.get("timeout", 20),
                    folder=folder,
                    prompt=prompt,
                    on_node_complete=_on_run_complete
                )
                # Final Summary
                if not stdout and not folder:
                    with self.print_lock:
                        printer.console.print(Rule(name.upper(), style="header"))
                    for unique, data in results.items():
                        output = data["output"] if isinstance(data, dict) else data
                        printer.node_panel(unique, output, 0)
                
                # ALWAYS show the aggregate execution summary at the end
                printer.run_summary(results)

            elif action == "test":
                expected = script.get("expected", [])
                # Show test_panel per node ONLY if stdout is True
                def _on_test_complete(unique, node_output, node_status, node_result):
                    nonlocal header_printed
                    if stdout:
                        with self.print_lock:
                            if not header_printed:
                                printer.console.print(Rule(name.upper(), style="header"))
                                header_printed = True
                        printer.test_panel(unique, node_output, node_status, node_result)

                results = self.app.services.execution.test_commands(
                    nodes_filter=nodelist,
                    commands=commands,
                    expected=expected,
                    variables=variables,
                    parallel=options.get("parallel", 10),
                    timeout=options.get("timeout", 20),
                    folder=folder,
                    prompt=prompt,
                    on_node_complete=_on_test_complete
                )
                # ALWAYS show the aggregate summary at the end
                printer.test_summary(results)

        except ConnpyError as e:
            printer.error(str(e))

Methods

def cli_run(self, script)
Expand source code
def cli_run(self, script):
    name = script.get("name", "Task")
    try:
        action = script["action"]
        nodelist = script["nodes"]
        commands = script["commands"]
        variables = script.get("variables")
        output_cfg = script["output"]
        options = script.get("options", {})
    except KeyError as e:
        printer.error(f"[{name}] '{e.args[0]}' is mandatory in script")
        sys.exit(11)

    stdout = (output_cfg == "stdout")
    folder = output_cfg if output_cfg not in [None, "stdout"] else None
    prompt = options.get("prompt")

    # Resolve and filter nodes through context-aware list_nodes
    try:
        if isinstance(nodelist, str):
            resolved_nodes = self.app.services.nodes.list_nodes(nodelist)
        elif isinstance(nodelist, list):
            resolved_nodes = []
            for item in nodelist:
                matches = self.app.services.nodes.list_nodes(item)
                for m in matches:
                    if m not in resolved_nodes:
                        resolved_nodes.append(m)
        else:
            resolved_nodes = []
    except Exception:
        resolved_nodes = []

    if not resolved_nodes:
        printer.error(f"[{name}] No nodes found matching filter: {nodelist}")
        sys.exit(11)

    nodelist = resolved_nodes

    try:
        header_printed = False
        if action == "run":
            # If stdout is true, we stream results as they arrive
            def _on_run_complete(unique, node_output, node_status):
                nonlocal header_printed
                if stdout:
                    with self.print_lock:
                        if not header_printed:
                            printer.console.print(Rule(name.upper(), style="header"))
                            header_printed = True
                    printer.node_panel(unique, node_output, node_status)

            results = self.app.services.execution.run_commands(
                nodes_filter=nodelist,
                commands=commands,
                variables=variables,
                parallel=options.get("parallel", 10),
                timeout=options.get("timeout", 20),
                folder=folder,
                prompt=prompt,
                on_node_complete=_on_run_complete
            )
            # Final Summary
            if not stdout and not folder:
                with self.print_lock:
                    printer.console.print(Rule(name.upper(), style="header"))
                for unique, data in results.items():
                    output = data["output"] if isinstance(data, dict) else data
                    printer.node_panel(unique, output, 0)
            
            # ALWAYS show the aggregate execution summary at the end
            printer.run_summary(results)

        elif action == "test":
            expected = script.get("expected", [])
            # Show test_panel per node ONLY if stdout is True
            def _on_test_complete(unique, node_output, node_status, node_result):
                nonlocal header_printed
                if stdout:
                    with self.print_lock:
                        if not header_printed:
                            printer.console.print(Rule(name.upper(), style="header"))
                            header_printed = True
                    printer.test_panel(unique, node_output, node_status, node_result)

            results = self.app.services.execution.test_commands(
                nodes_filter=nodelist,
                commands=commands,
                expected=expected,
                variables=variables,
                parallel=options.get("parallel", 10),
                timeout=options.get("timeout", 20),
                folder=folder,
                prompt=prompt,
                on_node_complete=_on_test_complete
            )
            # ALWAYS show the aggregate summary at the end
            printer.test_summary(results)

    except ConnpyError as e:
        printer.error(str(e))
def dispatch(self, args)
Expand source code
def dispatch(self, args):
    if len(args.data) > 1:
        args.action = "noderun"
    actions = {"noderun": self.node_run, "generate": self.yaml_generate, "run": self.yaml_run}
    return actions.get(args.action)(args)
def node_run(self, args)
Expand source code
def node_run(self, args):
    nodes_filter = args.data[0]
    
    # Resolve and filter nodes through context-aware list_nodes
    try:
        matched_nodes = self.app.services.nodes.list_nodes(nodes_filter)
    except Exception:
        matched_nodes = []
        
    if not matched_nodes:
        printer.error(f"No nodes found matching filter: {nodes_filter}")
        sys.exit(2)
        
    commands = [" ".join(args.data[1:])]

    try:
        header_printed = False

        if hasattr(args, 'test_expected') and args.test_expected:
            # Mode: Test
            def _on_node_complete(unique, node_output, node_status, node_result):
                nonlocal header_printed
                with self.print_lock:
                    if not header_printed:
                        printer.console.print(Rule("OUTPUT", style="header"))
                        header_printed = True
                printer.test_panel(unique, node_output, node_status, node_result)

            results = self.app.services.execution.test_commands(
                nodes_filter=matched_nodes,
                commands=commands,
                expected=args.test_expected,
                on_node_complete=_on_node_complete
            )
            printer.test_summary(results)
        else:
            # Mode: Normal Run
            def _on_node_complete(unique, node_output, node_status):
                nonlocal header_printed
                with self.print_lock:
                    if not header_printed:
                        printer.console.print(Rule("OUTPUT", style="header"))
                        header_printed = True
                printer.node_panel(unique, node_output, node_status)

            results = self.app.services.execution.run_commands(
                nodes_filter=matched_nodes,
                commands=commands,
                on_node_complete=_on_node_complete
            )
            printer.run_summary(results)

    except ConnpyError as e:
        printer.error(str(e))
        sys.exit(1)
def yaml_generate(self, args)
Expand source code
def yaml_generate(self, args):
    if os.path.exists(args.data[0]):
        printer.error(f"File '{args.data[0]}' already exists.")
        sys.exit(14)
    else:
        with open(args.data[0], "w") as file:
            file.write(get_instructions("generate"))
        printer.success(f"File {args.data[0]} generated successfully")
        sys.exit()
def yaml_run(self, args)
Expand source code
def yaml_run(self, args):
    path = args.data[0]
    try:
        with open(path, "r") as f:
            playbook = yaml.load(f, Loader=yaml.FullLoader)

        for task in playbook.get("tasks", []):
            self.cli_run(task)

    except Exception as e:
        printer.error(f"Failed to run playbook {path}: {e}")
        sys.exit(10)