From 6e6ddde0db1bf77393fc5e655cad849b721327a4 Mon Sep 17 00:00:00 2001 From: Justin Massey Date: Sat, 12 Apr 2025 17:30:27 -0400 Subject: [PATCH 1/2] adding jwt_leeway param to allow configuring the leeway param on PyJWT jwt.decode method --- workos/_base_client.py | 8 ++++++++ workos/async_client.py | 2 ++ workos/client.py | 2 ++ workos/session.py | 11 +++++++++++ workos/user_management.py | 2 ++ 5 files changed, 25 insertions(+) diff --git a/workos/_base_client.py b/workos/_base_client.py index 41f31a66..31049e1d 100644 --- a/workos/_base_client.py +++ b/workos/_base_client.py @@ -25,6 +25,7 @@ class BaseClient(ClientConfiguration): _base_url: str _client_id: str _request_timeout: int + _jwt_leeway: float def __init__( self, @@ -33,6 +34,7 @@ def __init__( client_id: Optional[str], base_url: Optional[str] = None, request_timeout: Optional[int] = None, + jwt_leeway: float = 0, ) -> None: api_key = api_key or os.getenv("WORKOS_API_KEY") if api_key is None: @@ -63,6 +65,8 @@ def __init__( if request_timeout else int(os.getenv("WORKOS_REQUEST_TIMEOUT", DEFAULT_REQUEST_TIMEOUT)) ) + + self._jwt_leeway = jwt_leeway @property @abstractmethod @@ -122,3 +126,7 @@ def client_id(self) -> str: @property def request_timeout(self) -> int: return self._request_timeout + + @property + def jwt_leeway(self) -> float: + return self._jwt_leeway diff --git a/workos/async_client.py b/workos/async_client.py index 61e4563e..54e4d629 100644 --- a/workos/async_client.py +++ b/workos/async_client.py @@ -28,12 +28,14 @@ def __init__( client_id: Optional[str] = None, base_url: Optional[str] = None, request_timeout: Optional[int] = None, + jwt_leeway: float = 0, ): super().__init__( api_key=api_key, client_id=client_id, base_url=base_url, request_timeout=request_timeout, + jwt_leeway=jwt_leeway, ) self._http_client = AsyncHTTPClient( api_key=self._api_key, diff --git a/workos/client.py b/workos/client.py index b61d3c9e..c45b9104 100644 --- a/workos/client.py +++ b/workos/client.py @@ -28,12 +28,14 @@ def __init__( client_id: Optional[str] = None, base_url: Optional[str] = None, request_timeout: Optional[int] = None, + jwt_leeway: float = 0, ): super().__init__( api_key=api_key, client_id=client_id, base_url=base_url, request_timeout=request_timeout, + jwt_leeway=jwt_leeway, ) self._http_client = SyncHTTPClient( api_key=self._api_key, diff --git a/workos/session.py b/workos/session.py index 3a105081..bf27078f 100644 --- a/workos/session.py +++ b/workos/session.py @@ -28,6 +28,7 @@ class SessionModule(Protocol): cookie_password: str jwks: PyJWKClient jwk_algorithms: List[str] + jwt_leeway: float def __init__( self, @@ -36,6 +37,7 @@ def __init__( client_id: str, session_data: str, cookie_password: str, + jwt_leeway: float = 0, ) -> None: # If the cookie password is not provided, throw an error if cookie_password is None or cookie_password == "": @@ -45,6 +47,7 @@ def __init__( self.client_id = client_id self.session_data = session_data self.cookie_password = cookie_password + self.jwt_leeway = jwt_leeway self.jwks = PyJWKClient(self.user_management.get_jwks_url()) @@ -89,6 +92,7 @@ def authenticate( signing_key.key, algorithms=self.jwk_algorithms, options={"verify_aud": False}, + leeway=self.jwt_leeway, ) return AuthenticateWithSessionCookieSuccessResponse( @@ -136,6 +140,7 @@ def _is_valid_jwt(self, token: str) -> bool: signing_key.key, algorithms=self.jwk_algorithms, options={"verify_aud": False}, + leeway=self.jwt_leeway, ) return True except jwt.exceptions.InvalidTokenError: @@ -167,6 +172,7 @@ def __init__( client_id: str, session_data: str, cookie_password: str, + jwt_leeway: float = 0, ) -> None: # If the cookie password is not provided, throw an error if cookie_password is None or cookie_password == "": @@ -176,6 +182,7 @@ def __init__( self.client_id = client_id self.session_data = session_data self.cookie_password = cookie_password + self.jwt_leeway = jwt_leeway self.jwks = PyJWKClient(self.user_management.get_jwks_url()) @@ -228,6 +235,7 @@ def refresh( signing_key.key, algorithms=self.jwk_algorithms, options={"verify_aud": False}, + leeway=self.jwt_leeway, ) return RefreshWithSessionCookieSuccessResponse( @@ -257,6 +265,7 @@ def __init__( client_id: str, session_data: str, cookie_password: str, + jwt_leeway: float = 0, ) -> None: # If the cookie password is not provided, throw an error if cookie_password is None or cookie_password == "": @@ -266,6 +275,7 @@ def __init__( self.client_id = client_id self.session_data = session_data self.cookie_password = cookie_password + self.jwt_leeway = jwt_leeway self.jwks = PyJWKClient(self.user_management.get_jwks_url()) @@ -318,6 +328,7 @@ async def refresh( signing_key.key, algorithms=self.jwk_algorithms, options={"verify_aud": False}, + leeway=self.jwt_leeway, ) return RefreshWithSessionCookieSuccessResponse( diff --git a/workos/user_management.py b/workos/user_management.py index 93ab5c8c..1f9143a8 100644 --- a/workos/user_management.py +++ b/workos/user_management.py @@ -866,6 +866,7 @@ def load_sealed_session( client_id=self._http_client.client_id, session_data=sealed_session, cookie_password=cookie_password, + jwt_leeway=self._client_configuration.jwt_leeway, ) def get_user(self, user_id: str) -> User: @@ -1491,6 +1492,7 @@ async def load_sealed_session( client_id=self._http_client.client_id, session_data=sealed_session, cookie_password=cookie_password, + jwt_leeway=self._client_configuration.jwt_leeway, ) async def get_user(self, user_id: str) -> User: From b18d855178f5e8fa63fce8b6e8a3dbe3bfec0392 Mon Sep 17 00:00:00 2001 From: Justin Massey Date: Sat, 12 Apr 2025 17:44:38 -0400 Subject: [PATCH 2/2] add tests for new jwt_leeway --- tests/test_session.py | 163 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/tests/test_session.py b/tests/test_session.py index b2fb654e..acd5c0dc 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -2,6 +2,7 @@ from unittest.mock import Mock, patch import jwt from datetime import datetime, timezone +import time from tests.conftest import with_jwks_mock from workos.session import AsyncSession, Session @@ -394,6 +395,168 @@ def test_refresh_success_with_aud_claim( assert isinstance(response, RefreshWithSessionCookieSuccessResponse) + @with_jwks_mock + def test_authenticate_with_slightly_expired_jwt_fails_without_leeway( + self, session_constants, mock_user_management + ): + # Create a token that's expired by 5 seconds + current_time = int(time.time()) + + # Create token claims with exp 5 seconds in the past + token_claims = { + **session_constants["TEST_TOKEN_CLAIMS"], + "exp": current_time - 5, # Expired by 5 seconds + "iat": current_time - 60, # Issued 60 seconds ago + } + + slightly_expired_token = jwt.encode( + token_claims, + session_constants["PRIVATE_KEY"], + algorithm="RS256", + ) + + # Prepare sealed session data with the slightly expired token + session_data = Session.seal_data( + {"access_token": slightly_expired_token, "user": session_constants["TEST_USER"]}, + session_constants["COOKIE_PASSWORD"], + ) + + # With default leeway=0, authentication should fail + session = Session( + user_management=mock_user_management, + client_id=session_constants["CLIENT_ID"], + session_data=session_data, + cookie_password=session_constants["COOKIE_PASSWORD"], + jwt_leeway=0, + ) + + response = session.authenticate() + assert response.authenticated is False + assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_JWT + + @with_jwks_mock + def test_authenticate_with_slightly_expired_jwt_succeeds_with_leeway( + self, session_constants, mock_user_management + ): + # Create a token that's expired by 5 seconds + current_time = int(time.time()) + + # Create token claims with exp 5 seconds in the past + token_claims = { + **session_constants["TEST_TOKEN_CLAIMS"], + "exp": current_time - 5, # Expired by 5 seconds + "iat": current_time - 60, # Issued 60 seconds ago + } + + slightly_expired_token = jwt.encode( + token_claims, + session_constants["PRIVATE_KEY"], + algorithm="RS256", + ) + + # Prepare sealed session data with the slightly expired token + session_data = Session.seal_data( + {"access_token": slightly_expired_token, "user": session_constants["TEST_USER"]}, + session_constants["COOKIE_PASSWORD"], + ) + + # With leeway=10, authentication should succeed + session = Session( + user_management=mock_user_management, + client_id=session_constants["CLIENT_ID"], + session_data=session_data, + cookie_password=session_constants["COOKIE_PASSWORD"], + jwt_leeway=10, # 10 seconds leeway + ) + + response = session.authenticate() + assert response.authenticated is True + assert response.session_id == session_constants["TEST_TOKEN_CLAIMS"]["sid"] + + @with_jwks_mock + def test_authenticate_with_significantly_expired_jwt_fails_without_leeway( + self, session_constants, mock_user_management + ): + # Create a token that's expired by 60 seconds + current_time = int(time.time()) + + # Create token claims with exp 60 seconds in the past + token_claims = { + **session_constants["TEST_TOKEN_CLAIMS"], + "exp": current_time - 60, # Expired by 60 seconds + "iat": current_time - 120, # Issued 120 seconds ago + } + + significantly_expired_token = jwt.encode( + token_claims, + session_constants["PRIVATE_KEY"], + algorithm="RS256", + ) + + # Prepare sealed session data with the significantly expired token + session_data = Session.seal_data( + { + "access_token": significantly_expired_token, + "user": session_constants["TEST_USER"] + }, + session_constants["COOKIE_PASSWORD"], + ) + + # With default leeway=0, authentication should fail + session = Session( + user_management=mock_user_management, + client_id=session_constants["CLIENT_ID"], + session_data=session_data, + cookie_password=session_constants["COOKIE_PASSWORD"], + jwt_leeway=0, + ) + + response = session.authenticate() + assert response.authenticated is False + assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_JWT + + @with_jwks_mock + def test_authenticate_with_significantly_expired_jwt_fails_with_insufficient_leeway( + self, session_constants, mock_user_management + ): + # Create a token that's expired by 60 seconds + current_time = int(time.time()) + + # Create token claims with exp 60 seconds in the past + token_claims = { + **session_constants["TEST_TOKEN_CLAIMS"], + "exp": current_time - 60, # Expired by 60 seconds + "iat": current_time - 120, # Issued 120 seconds ago + } + + significantly_expired_token = jwt.encode( + token_claims, + session_constants["PRIVATE_KEY"], + algorithm="RS256", + ) + + # Prepare sealed session data with the significantly expired token + session_data = Session.seal_data( + { + "access_token": significantly_expired_token, + "user": session_constants["TEST_USER"] + }, + session_constants["COOKIE_PASSWORD"], + ) + + # With leeway=10, authentication should still fail (not enough leeway) + session = Session( + user_management=mock_user_management, + client_id=session_constants["CLIENT_ID"], + session_data=session_data, + cookie_password=session_constants["COOKIE_PASSWORD"], + jwt_leeway=10, # 10 seconds leeway is not enough for 60 seconds expiration + ) + + response = session.authenticate() + assert response.authenticated is False + assert response.reason == AuthenticateWithSessionCookieFailureReason.INVALID_JWT + class TestAsyncSession(SessionFixtures): @with_jwks_mock