Module connpy.cli.user_handler

Classes

class UserHandler (app)
Expand source code
class UserHandler:
    def __init__(self, app):
        self.app = app

    def dispatch(self, args):
        if self.app.services.mode == "remote":
            printer.error("User management commands are only available in local/server-side mode.")
            sys.exit(1)

        # Parse actions from argparse mutually exclusive options
        if getattr(args, "add", None):
            args.action = "add"
            args.username = args.add[0]
        elif getattr(args, "delete", None):
            args.action = "del"
            args.username = args.delete[0]
        elif getattr(args, "list", False):
            args.action = "list"
        elif getattr(args, "show", None):
            args.action = "show"
            args.username = args.show[0]
        elif getattr(args, "regen_password", None):
            args.action = "regen_password"
            args.username = args.regen_password[0]

        action = getattr(args, "action", None)
        
        if action == "add":
            return self.add_user(args)
        elif action == "del":
            return self.delete_user(args)
        elif action == "list":
            return self.list_users(args)
        elif action == "show":
            return self.show_user(args)
        elif action == "regen_password":
            return self.regen_password(args)
        else:
            printer.error(f"Unknown action: {action}")
            sys.exit(1)

    def add_user(self, args):
        username = getattr(args, "username", None)
        if not username:
            printer.error("Username is required. Usage: connpy user --add <username>")
            sys.exit(1)
            
        custom_path = getattr(args, "path", None)
        if custom_path:
            custom_path = custom_path[0] if isinstance(custom_path, list) else custom_path

        try:
            password = getpass.getpass("Enter password for new user: ")
            if not password:
                printer.error("Password cannot be empty.")
                sys.exit(1)
            confirm = getpass.getpass("Confirm password: ")
            if password != confirm:
                printer.error("Passwords do not match.")
                sys.exit(1)
        except (KeyboardInterrupt, EOFError):
            printer.warning("\nOperation cancelled.")
            sys.exit(130)

        try:
            self.app.services.users.create_user(username, password, config_path=custom_path)
            printer.success(f"User '{username}' created successfully.")
        except ConnpyError as e:
            printer.error(str(e))
            sys.exit(1)
        except ValueError as e:
            printer.error(str(e))
            sys.exit(1)
        except Exception as e:
            printer.error(f"Failed to create user: {e}")
            sys.exit(1)

    def delete_user(self, args):
        username = getattr(args, "username", None)
        if not username:
            printer.error("Username is required. Usage: connpy user --del <username>")
            sys.exit(1)

        try:
            self.app.services.users.delete_user(username)
            printer.success(f"User '{username}' deleted successfully.")
        except ConnpyError as e:
            printer.error(str(e))
            sys.exit(1)
        except ValueError as e:
            printer.error(str(e))
            sys.exit(1)
        except Exception as e:
            printer.error(f"Failed to delete user: {e}")
            sys.exit(1)

    def list_users(self, args):
        try:
            users = self.app.services.users.list_users()
            if not users:
                printer.warning("No users registered.")
                return
            
            # Format custom config path, falling back to computed default path instead of null/None
            formatted_users = []
            for u in users:
                formatted_u = u.copy()
                if not formatted_u.get("config_path"):
                    formatted_u["config_path"] = os.path.join(self.app.services.users.users_dir, formatted_u["username"])
                formatted_users.append(formatted_u)
            
            yaml_str = yaml.dump(formatted_users, sort_keys=False, default_flow_style=False)
            printer.data("Registered Users", yaml_str)
        except Exception as e:
            printer.error(f"Failed to list users: {e}")
            sys.exit(1)

    def show_user(self, args):
        username = getattr(args, "username", None)
        if not username:
            printer.error("Username is required. Usage: connpy user --show <username>")
            sys.exit(1)

        try:
            user = self.app.services.users.get_user(username)
            if not user:
                printer.error(f"User '{username}' not found.")
                sys.exit(1)
            
            # Hide the password hash from the CLI output for safety
            safe_user = {k: v for k, v in user.items() if k != "password_hash"}
            if not safe_user.get("config_path"):
                safe_user["config_path"] = os.path.join(self.app.services.users.users_dir, username)
            
            yaml_str = yaml.dump(safe_user, sort_keys=False, default_flow_style=False)
            printer.data(f"User: {username}", yaml_str)
        except ValueError as e:
            printer.error(str(e))
            sys.exit(1)
        except Exception as e:
            printer.error(f"Failed to retrieve user details: {e}")
            sys.exit(1)

    def regen_password(self, args):
        username = getattr(args, "username", None)
        if not username:
            printer.error("Username is required. Usage: connpy user --regen-password <username>")
            sys.exit(1)

        try:
            user = self.app.services.users.get_user(username)
            if not user:
                printer.error(f"User '{username}' not found.")
                sys.exit(1)
        except ValueError as e:
            printer.error(str(e))
            sys.exit(1)
        except Exception as e:
            printer.error(f"Failed to retrieve user details: {e}")
            sys.exit(1)

        try:
            new_password = getpass.getpass("Enter new password: ")
            if not new_password:
                printer.error("Password cannot be empty.")
                sys.exit(1)
            confirm = getpass.getpass("Confirm new password: ")
            if new_password != confirm:
                printer.error("Passwords do not match.")
                sys.exit(1)
        except (KeyboardInterrupt, EOFError):
            printer.warning("\nOperation cancelled.")
            sys.exit(130)

        try:
            self.app.services.users.admin_change_password(username, new_password)
            printer.success(f"Password for user '{username}' regenerated successfully.")
        except ValueError as e:
            printer.error(str(e))
            sys.exit(1)
        except Exception as e:
            printer.error(f"Failed to regenerate password: {e}")
            sys.exit(1)

Methods

def add_user(self, args)
Expand source code
def add_user(self, args):
    username = getattr(args, "username", None)
    if not username:
        printer.error("Username is required. Usage: connpy user --add <username>")
        sys.exit(1)
        
    custom_path = getattr(args, "path", None)
    if custom_path:
        custom_path = custom_path[0] if isinstance(custom_path, list) else custom_path

    try:
        password = getpass.getpass("Enter password for new user: ")
        if not password:
            printer.error("Password cannot be empty.")
            sys.exit(1)
        confirm = getpass.getpass("Confirm password: ")
        if password != confirm:
            printer.error("Passwords do not match.")
            sys.exit(1)
    except (KeyboardInterrupt, EOFError):
        printer.warning("\nOperation cancelled.")
        sys.exit(130)

    try:
        self.app.services.users.create_user(username, password, config_path=custom_path)
        printer.success(f"User '{username}' created successfully.")
    except ConnpyError as e:
        printer.error(str(e))
        sys.exit(1)
    except ValueError as e:
        printer.error(str(e))
        sys.exit(1)
    except Exception as e:
        printer.error(f"Failed to create user: {e}")
        sys.exit(1)
def delete_user(self, args)
Expand source code
def delete_user(self, args):
    username = getattr(args, "username", None)
    if not username:
        printer.error("Username is required. Usage: connpy user --del <username>")
        sys.exit(1)

    try:
        self.app.services.users.delete_user(username)
        printer.success(f"User '{username}' deleted successfully.")
    except ConnpyError as e:
        printer.error(str(e))
        sys.exit(1)
    except ValueError as e:
        printer.error(str(e))
        sys.exit(1)
    except Exception as e:
        printer.error(f"Failed to delete user: {e}")
        sys.exit(1)
def dispatch(self, args)
Expand source code
def dispatch(self, args):
    if self.app.services.mode == "remote":
        printer.error("User management commands are only available in local/server-side mode.")
        sys.exit(1)

    # Parse actions from argparse mutually exclusive options
    if getattr(args, "add", None):
        args.action = "add"
        args.username = args.add[0]
    elif getattr(args, "delete", None):
        args.action = "del"
        args.username = args.delete[0]
    elif getattr(args, "list", False):
        args.action = "list"
    elif getattr(args, "show", None):
        args.action = "show"
        args.username = args.show[0]
    elif getattr(args, "regen_password", None):
        args.action = "regen_password"
        args.username = args.regen_password[0]

    action = getattr(args, "action", None)
    
    if action == "add":
        return self.add_user(args)
    elif action == "del":
        return self.delete_user(args)
    elif action == "list":
        return self.list_users(args)
    elif action == "show":
        return self.show_user(args)
    elif action == "regen_password":
        return self.regen_password(args)
    else:
        printer.error(f"Unknown action: {action}")
        sys.exit(1)
def list_users(self, args)
Expand source code
def list_users(self, args):
    try:
        users = self.app.services.users.list_users()
        if not users:
            printer.warning("No users registered.")
            return
        
        # Format custom config path, falling back to computed default path instead of null/None
        formatted_users = []
        for u in users:
            formatted_u = u.copy()
            if not formatted_u.get("config_path"):
                formatted_u["config_path"] = os.path.join(self.app.services.users.users_dir, formatted_u["username"])
            formatted_users.append(formatted_u)
        
        yaml_str = yaml.dump(formatted_users, sort_keys=False, default_flow_style=False)
        printer.data("Registered Users", yaml_str)
    except Exception as e:
        printer.error(f"Failed to list users: {e}")
        sys.exit(1)
def regen_password(self, args)
Expand source code
def regen_password(self, args):
    username = getattr(args, "username", None)
    if not username:
        printer.error("Username is required. Usage: connpy user --regen-password <username>")
        sys.exit(1)

    try:
        user = self.app.services.users.get_user(username)
        if not user:
            printer.error(f"User '{username}' not found.")
            sys.exit(1)
    except ValueError as e:
        printer.error(str(e))
        sys.exit(1)
    except Exception as e:
        printer.error(f"Failed to retrieve user details: {e}")
        sys.exit(1)

    try:
        new_password = getpass.getpass("Enter new password: ")
        if not new_password:
            printer.error("Password cannot be empty.")
            sys.exit(1)
        confirm = getpass.getpass("Confirm new password: ")
        if new_password != confirm:
            printer.error("Passwords do not match.")
            sys.exit(1)
    except (KeyboardInterrupt, EOFError):
        printer.warning("\nOperation cancelled.")
        sys.exit(130)

    try:
        self.app.services.users.admin_change_password(username, new_password)
        printer.success(f"Password for user '{username}' regenerated successfully.")
    except ValueError as e:
        printer.error(str(e))
        sys.exit(1)
    except Exception as e:
        printer.error(f"Failed to regenerate password: {e}")
        sys.exit(1)
def show_user(self, args)
Expand source code
def show_user(self, args):
    username = getattr(args, "username", None)
    if not username:
        printer.error("Username is required. Usage: connpy user --show <username>")
        sys.exit(1)

    try:
        user = self.app.services.users.get_user(username)
        if not user:
            printer.error(f"User '{username}' not found.")
            sys.exit(1)
        
        # Hide the password hash from the CLI output for safety
        safe_user = {k: v for k, v in user.items() if k != "password_hash"}
        if not safe_user.get("config_path"):
            safe_user["config_path"] = os.path.join(self.app.services.users.users_dir, username)
        
        yaml_str = yaml.dump(safe_user, sort_keys=False, default_flow_style=False)
        printer.data(f"User: {username}", yaml_str)
    except ValueError as e:
        printer.error(str(e))
        sys.exit(1)
    except Exception as e:
        printer.error(f"Failed to retrieve user details: {e}")
        sys.exit(1)