Module connpy.cli.sso_handler

Classes

class SSOHandler (app)
Expand source code
class SSOHandler:
    def __init__(self, app):
        self.app = app

    def dispatch(self, args):
        if self.app.services.mode == "remote":
            printer.error("SSO management commands are only available in local/server-side mode.")
            sys.exit(1)

        # Parse actions from argparse mutually exclusive options
        if getattr(args, "add", None):
            args.action = "add"
            args.provider = args.add[0]
        elif getattr(args, "delete", None):
            args.action = "del"
            args.provider = args.delete[0]
        elif getattr(args, "list", False):
            args.action = "list"
        elif getattr(args, "show", None):
            args.action = "show"
            args.provider = args.show[0]

        action = getattr(args, "action", None)
        
        if action == "add":
            return self.add_provider(args)
        elif action == "del":
            return self.delete_provider(args)
        elif action == "list":
            return self.list_providers(args)
        elif action == "show":
            return self.show_provider(args)
        else:
            printer.error(f"Unknown action: {action}")
            sys.exit(1)

    def add_provider(self, args):
        provider = args.provider
        sso = self.app.config.config.get("sso", {})
        providers = sso.setdefault("providers", {})
        
        existing = providers.get(provider, {})
        if existing:
            printer.warning(f"SSO Provider '{provider}' already exists. Overwriting/Editing it.")
        
        # Interactive questionnaire
        questions = [
            inquirer.Text("jwks_url", message="JWKS URL (optional, press Enter to skip)", default=existing.get("jwks_url", "")),
            inquirer.Text("secret", message="Client Secret / Shared Secret (optional, press Enter to skip)", default=existing.get("secret", "")),
            inquirer.Text("username_claim", message="Username Claim", default=existing.get("username_claim", "sub")),
            inquirer.Text("algorithms", message="Algorithms (comma separated)", default=",".join(existing.get("algorithms", ["RS256"]))),
            inquirer.Text("allowed_domains", message="Allowed/Trusted Email Domains (comma separated, optional)", default=",".join(existing.get("allowed_domains", [])))
        ]
        
        answers = inquirer.prompt(questions)
        if not answers:
            printer.warning("Operation cancelled.")
            sys.exit(130)
            
        jwks_url = answers["jwks_url"].strip()
        secret = answers["secret"].strip()
        username_claim = answers["username_claim"].strip()
        algorithms_str = answers["algorithms"].strip()
        allowed_domains_str = answers.get("allowed_domains", "").strip()
        
        if not jwks_url and not secret:
            printer.error("You must configure either a JWKS URL or a Secret.")
            sys.exit(1)
            
        if not username_claim:
            printer.error("Username claim cannot be empty.")
            sys.exit(1)
            
        algorithms = [alg.strip() for alg in algorithms_str.split(",") if alg.strip()]
        if not algorithms:
            algorithms = ["RS256"]
            
        allowed_domains = [domain.strip() for domain in allowed_domains_str.split(",") if domain.strip()]
            
        provider_data = {
            "username_claim": username_claim,
            "algorithms": algorithms
        }
        if jwks_url:
            provider_data["jwks_url"] = jwks_url
        if secret:
            provider_data["secret"] = secret
        if allowed_domains:
            provider_data["allowed_domains"] = allowed_domains
            
        providers[provider] = provider_data
        
        # Save config
        try:
            self.app.services.config_svc.update_setting("sso", sso)
            printer.success(f"SSO Provider '{provider}' saved successfully.")
        except Exception as e:
            printer.error(f"Failed to save SSO configuration: {e}")
            sys.exit(1)

    def delete_provider(self, args):
        provider = args.provider
        sso = self.app.config.config.get("sso", {})
        providers = sso.get("providers", {})
        
        if provider not in providers:
            printer.error(f"SSO Provider '{provider}' not found.")
            sys.exit(1)
            
        # Confirm delete
        questions = [inquirer.Confirm("confirm", message=f"Are you sure you want to delete SSO Provider '{provider}'?", default=False)]
        answers = inquirer.prompt(questions)
        if not answers or not answers["confirm"]:
            printer.info("Delete cancelled.")
            return
            
        del providers[provider]
        
        # Save config
        try:
            self.app.services.config_svc.update_setting("sso", sso)
            printer.success(f"SSO Provider '{provider}' deleted successfully.")
        except Exception as e:
            printer.error(f"Failed to save SSO configuration: {e}")
            sys.exit(1)

    def list_providers(self, args):
        sso = self.app.config.config.get("sso", {})
        providers = sso.get("providers", {})
        if not providers:
            printer.warning("No SSO providers configured.")
            return
            
        # Print list in YAML format
        providers_list = list(providers.keys())
        yaml_str = yaml.dump(providers_list, sort_keys=False, default_flow_style=False)
        printer.data("Configured SSO Providers", yaml_str)

    def show_provider(self, args):
        provider = args.provider
        sso = self.app.config.config.get("sso", {})
        providers = sso.get("providers", {})
        
        if provider not in providers:
            printer.error(f"SSO Provider '{provider}' not found.")
            sys.exit(1)
            
        data = providers[provider]
        
        # Mask client secret for display if it's sensitive and not an env var starting with $
        display_data = data.copy()
        secret = display_data.get("secret")
        if secret and not secret.startswith("$"):
            display_data["secret"] = "********"
            
        yaml_str = yaml.dump(display_data, sort_keys=False, default_flow_style=False)
        printer.data(f"SSO Provider: {provider}", yaml_str)

Methods

def add_provider(self, args)
Expand source code
def add_provider(self, args):
    provider = args.provider
    sso = self.app.config.config.get("sso", {})
    providers = sso.setdefault("providers", {})
    
    existing = providers.get(provider, {})
    if existing:
        printer.warning(f"SSO Provider '{provider}' already exists. Overwriting/Editing it.")
    
    # Interactive questionnaire
    questions = [
        inquirer.Text("jwks_url", message="JWKS URL (optional, press Enter to skip)", default=existing.get("jwks_url", "")),
        inquirer.Text("secret", message="Client Secret / Shared Secret (optional, press Enter to skip)", default=existing.get("secret", "")),
        inquirer.Text("username_claim", message="Username Claim", default=existing.get("username_claim", "sub")),
        inquirer.Text("algorithms", message="Algorithms (comma separated)", default=",".join(existing.get("algorithms", ["RS256"]))),
        inquirer.Text("allowed_domains", message="Allowed/Trusted Email Domains (comma separated, optional)", default=",".join(existing.get("allowed_domains", [])))
    ]
    
    answers = inquirer.prompt(questions)
    if not answers:
        printer.warning("Operation cancelled.")
        sys.exit(130)
        
    jwks_url = answers["jwks_url"].strip()
    secret = answers["secret"].strip()
    username_claim = answers["username_claim"].strip()
    algorithms_str = answers["algorithms"].strip()
    allowed_domains_str = answers.get("allowed_domains", "").strip()
    
    if not jwks_url and not secret:
        printer.error("You must configure either a JWKS URL or a Secret.")
        sys.exit(1)
        
    if not username_claim:
        printer.error("Username claim cannot be empty.")
        sys.exit(1)
        
    algorithms = [alg.strip() for alg in algorithms_str.split(",") if alg.strip()]
    if not algorithms:
        algorithms = ["RS256"]
        
    allowed_domains = [domain.strip() for domain in allowed_domains_str.split(",") if domain.strip()]
        
    provider_data = {
        "username_claim": username_claim,
        "algorithms": algorithms
    }
    if jwks_url:
        provider_data["jwks_url"] = jwks_url
    if secret:
        provider_data["secret"] = secret
    if allowed_domains:
        provider_data["allowed_domains"] = allowed_domains
        
    providers[provider] = provider_data
    
    # Save config
    try:
        self.app.services.config_svc.update_setting("sso", sso)
        printer.success(f"SSO Provider '{provider}' saved successfully.")
    except Exception as e:
        printer.error(f"Failed to save SSO configuration: {e}")
        sys.exit(1)
def delete_provider(self, args)
Expand source code
def delete_provider(self, args):
    provider = args.provider
    sso = self.app.config.config.get("sso", {})
    providers = sso.get("providers", {})
    
    if provider not in providers:
        printer.error(f"SSO Provider '{provider}' not found.")
        sys.exit(1)
        
    # Confirm delete
    questions = [inquirer.Confirm("confirm", message=f"Are you sure you want to delete SSO Provider '{provider}'?", default=False)]
    answers = inquirer.prompt(questions)
    if not answers or not answers["confirm"]:
        printer.info("Delete cancelled.")
        return
        
    del providers[provider]
    
    # Save config
    try:
        self.app.services.config_svc.update_setting("sso", sso)
        printer.success(f"SSO Provider '{provider}' deleted successfully.")
    except Exception as e:
        printer.error(f"Failed to save SSO configuration: {e}")
        sys.exit(1)
def dispatch(self, args)
Expand source code
def dispatch(self, args):
    if self.app.services.mode == "remote":
        printer.error("SSO management commands are only available in local/server-side mode.")
        sys.exit(1)

    # Parse actions from argparse mutually exclusive options
    if getattr(args, "add", None):
        args.action = "add"
        args.provider = args.add[0]
    elif getattr(args, "delete", None):
        args.action = "del"
        args.provider = args.delete[0]
    elif getattr(args, "list", False):
        args.action = "list"
    elif getattr(args, "show", None):
        args.action = "show"
        args.provider = args.show[0]

    action = getattr(args, "action", None)
    
    if action == "add":
        return self.add_provider(args)
    elif action == "del":
        return self.delete_provider(args)
    elif action == "list":
        return self.list_providers(args)
    elif action == "show":
        return self.show_provider(args)
    else:
        printer.error(f"Unknown action: {action}")
        sys.exit(1)
def list_providers(self, args)
Expand source code
def list_providers(self, args):
    sso = self.app.config.config.get("sso", {})
    providers = sso.get("providers", {})
    if not providers:
        printer.warning("No SSO providers configured.")
        return
        
    # Print list in YAML format
    providers_list = list(providers.keys())
    yaml_str = yaml.dump(providers_list, sort_keys=False, default_flow_style=False)
    printer.data("Configured SSO Providers", yaml_str)
def show_provider(self, args)
Expand source code
def show_provider(self, args):
    provider = args.provider
    sso = self.app.config.config.get("sso", {})
    providers = sso.get("providers", {})
    
    if provider not in providers:
        printer.error(f"SSO Provider '{provider}' not found.")
        sys.exit(1)
        
    data = providers[provider]
    
    # Mask client secret for display if it's sensitive and not an env var starting with $
    display_data = data.copy()
    secret = display_data.get("secret")
    if secret and not secret.startswith("$"):
        display_data["secret"] = "********"
        
    yaml_str = yaml.dump(display_data, sort_keys=False, default_flow_style=False)
    printer.data(f"SSO Provider: {provider}", yaml_str)