feat(cli): agregar comando connpy login --status
This commit is contained in:
@@ -19,6 +19,9 @@ class LoginHandler:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
def login(self, args):
|
def login(self, args):
|
||||||
|
if getattr(args, "status", False):
|
||||||
|
return self.show_status()
|
||||||
|
|
||||||
if self.app.services.mode != "remote":
|
if self.app.services.mode != "remote":
|
||||||
printer.warning("Note: Your current configuration is set to local mode. Logging in will save credentials, but they will only apply when service-mode is set to 'remote'.")
|
printer.warning("Note: Your current configuration is set to local mode. Logging in will save credentials, but they will only apply when service-mode is set to 'remote'.")
|
||||||
|
|
||||||
@@ -90,3 +93,51 @@ class LoginHandler:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
else:
|
else:
|
||||||
printer.info("No active session found (already logged out).")
|
printer.info("No active session found (already logged out).")
|
||||||
|
|
||||||
|
def show_status(self):
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
token_path = os.path.join(self.app.config.defaultdir, ".token")
|
||||||
|
if not os.path.exists(token_path):
|
||||||
|
printer.warning("No active session found. You can log in using 'connpy login'.")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(token_path, "r") as f:
|
||||||
|
token = f.read().strip()
|
||||||
|
|
||||||
|
parts = token.split(".")
|
||||||
|
if len(parts) != 3:
|
||||||
|
printer.error("Invalid local session token format.")
|
||||||
|
return
|
||||||
|
|
||||||
|
payload_b64 = parts[1]
|
||||||
|
payload_b64 += "=" * ((4 - len(payload_b64) % 4) % 4)
|
||||||
|
payload_bytes = base64.urlsafe_b64decode(payload_b64)
|
||||||
|
payload = json.loads(payload_bytes.decode("utf-8"))
|
||||||
|
|
||||||
|
username = payload.get("sub")
|
||||||
|
exp = payload.get("exp")
|
||||||
|
|
||||||
|
if not exp:
|
||||||
|
printer.success(f"Active session as '{username}' (Indefinite expiration).")
|
||||||
|
return
|
||||||
|
|
||||||
|
now = datetime.datetime.now(datetime.timezone.utc).timestamp()
|
||||||
|
if now > exp:
|
||||||
|
printer.error("Session has expired. Please log in again using 'connpy login'.")
|
||||||
|
return
|
||||||
|
|
||||||
|
remaining = exp - now
|
||||||
|
hours = int(remaining // 3600)
|
||||||
|
minutes = int((remaining % 3600) // 60)
|
||||||
|
|
||||||
|
printer.success(f"Logged in as '{username}'")
|
||||||
|
printer.info(f"Time remaining: {hours}h {minutes}m")
|
||||||
|
|
||||||
|
exp_dt = datetime.datetime.fromtimestamp(exp, datetime.timezone.utc)
|
||||||
|
printer.info(f"Expires at: {exp_dt.strftime('%Y-%m-%d %H:%M:%S UTC')}")
|
||||||
|
except Exception as e:
|
||||||
|
printer.error(f"Failed to check local session status: {e}")
|
||||||
|
|||||||
@@ -379,6 +379,7 @@ class connapp:
|
|||||||
loginparser = subparsers.add_parser("login", help="Login to remote connpy server", description="Login to remote connpy server", formatter_class=RichHelpFormatter)
|
loginparser = subparsers.add_parser("login", help="Login to remote connpy server", description="Login to remote connpy server", formatter_class=RichHelpFormatter)
|
||||||
loginparser.error = self._custom_error
|
loginparser.error = self._custom_error
|
||||||
loginparser.add_argument("username", nargs='?', default=None, help="Username to authenticate")
|
loginparser.add_argument("username", nargs='?', default=None, help="Username to authenticate")
|
||||||
|
loginparser.add_argument("-s", "--status", action="store_true", help="Check current login status")
|
||||||
loginparser.set_defaults(func=self._login.dispatch, action="login")
|
loginparser.set_defaults(func=self._login.dispatch, action="login")
|
||||||
|
|
||||||
#LOGOUTPARSER
|
#LOGOUTPARSER
|
||||||
|
|||||||
@@ -86,8 +86,15 @@ class TestCLIMultiUserParsing:
|
|||||||
|
|
||||||
args = parser.parse_args(["login", "someuser"])
|
args = parser.parse_args(["login", "someuser"])
|
||||||
assert args.username == "someuser"
|
assert args.username == "someuser"
|
||||||
|
assert args.status is False
|
||||||
assert args.func == app_instance._login.dispatch
|
assert args.func == app_instance._login.dispatch
|
||||||
|
|
||||||
|
args = parser.parse_args(["login", "--status"])
|
||||||
|
assert args.status is True
|
||||||
|
|
||||||
|
args = parser.parse_args(["login", "-s"])
|
||||||
|
assert args.status is True
|
||||||
|
|
||||||
args = parser.parse_args(["logout"])
|
args = parser.parse_args(["logout"])
|
||||||
assert args.func == app_instance._login.dispatch
|
assert args.func == app_instance._login.dispatch
|
||||||
|
|
||||||
@@ -184,3 +191,49 @@ class TestAuthClientInterceptor:
|
|||||||
|
|
||||||
# Verify metadata remains empty
|
# Verify metadata remains empty
|
||||||
assert len(intercepted_details.metadata) == 0
|
assert len(intercepted_details.metadata) == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestLoginHandlerStatus:
|
||||||
|
def test_status_no_token(self, app_instance):
|
||||||
|
handler = LoginHandler(app_instance)
|
||||||
|
|
||||||
|
with patch("os.path.exists", return_value=False):
|
||||||
|
with patch("connpy.printer.warning") as mock_warning:
|
||||||
|
handler.show_status()
|
||||||
|
mock_warning.assert_called_once_with("No active session found. You can log in using 'connpy login'.")
|
||||||
|
|
||||||
|
def test_status_invalid_token(self, app_instance):
|
||||||
|
handler = LoginHandler(app_instance)
|
||||||
|
|
||||||
|
with patch("os.path.exists", return_value=True):
|
||||||
|
with patch("builtins.open", mock_open(read_data="invalid-token")):
|
||||||
|
with patch("connpy.printer.error") as mock_error:
|
||||||
|
handler.show_status()
|
||||||
|
mock_error.assert_called_once_with("Invalid local session token format.")
|
||||||
|
|
||||||
|
def test_status_valid_token(self, app_instance):
|
||||||
|
handler = LoginHandler(app_instance)
|
||||||
|
|
||||||
|
# Mock token payload: {"sub": "testuser", "exp": 1780007003}
|
||||||
|
# Part 1 (header): eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
|
||||||
|
# Part 2 (payload): eyJzdWIiOiJ0ZXN0dXNlciIsImV4cCI6MTc4MDAwNzAwM30
|
||||||
|
# Part 3 (sig): signature
|
||||||
|
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0dXNlciIsImV4cCI6MTc4MDAwNzAwM30.signature"
|
||||||
|
|
||||||
|
with patch("os.path.exists", return_value=True):
|
||||||
|
with patch("builtins.open", mock_open(read_data=token)):
|
||||||
|
with patch("connpy.printer.success") as mock_success:
|
||||||
|
with patch("connpy.printer.info") as mock_info:
|
||||||
|
# Patch time so exp is in the future
|
||||||
|
with patch("datetime.datetime") as mock_dt:
|
||||||
|
mock_dt.now.return_value.timestamp.return_value = 1780000000
|
||||||
|
# Mock fromtimestamp for expiration display
|
||||||
|
mock_dt.fromtimestamp.return_value.strftime.return_value = "2026-05-28 19:23:23 UTC"
|
||||||
|
|
||||||
|
handler.show_status()
|
||||||
|
mock_success.assert_called_once_with("Logged in as 'testuser'")
|
||||||
|
|
||||||
|
|
||||||
|
def mock_open(*args, **kwargs):
|
||||||
|
from unittest.mock import mock_open as unittest_mock_open
|
||||||
|
return unittest_mock_open(*args, **kwargs)
|
||||||
|
|||||||
Reference in New Issue
Block a user