Source code for simple_openid_connect.integrations.django.user_mapping

"""
Default implementations for mapping tokens to user objects.

This implementation can be overriden by extending the :class:`UserMapper` class and then setting the django settings
variable ``OPENID_USER_MAPPER`` to an import string pointing to the newly created class.
"""

import logging
from hashlib import sha256
from typing import Any, Tuple, Union, Optional

from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractBaseUser, AbstractUser
from django.core.cache import cache
from django.db import transaction
from django.utils import timezone

from simple_openid_connect.client import OpenidClient
from simple_openid_connect.data import (
    IdToken,
    JwtAccessToken,
    TokenIntrospectionErrorResponse,
    TokenIntrospectionSuccessResponse,
    UserinfoSuccessResponse,
)
from simple_openid_connect.exceptions import ValidationError
from simple_openid_connect.integrations.django.apps import OpenidAppConfig
from simple_openid_connect.integrations.django.models import OpenidUser

logger = logging.getLogger(__name__)

FederatedUserData = Union[
    IdToken, UserinfoSuccessResponse, TokenIntrospectionSuccessResponse, JwtAccessToken
]
"Type alias for the different classes which can provide information about a federated user."

UserModel = Any


[docs] class UserMapper: """ A base class which is responsible for mapping federated users into the local system. """
[docs] def handle_federated_userinfo(self, user_data: FederatedUserData) -> UserModel: """ Entry point for dynamically creating or updating user data based on information obtained through OpenID. The function automatically creates a new user model instance if the user is unknown or updates the locally stored user information based on the federated data. :param user_data: Information about the user. :returns: An instance of the applications user model. """ # validate that user data contains at least a user id if user_data.sub is None: raise ValidationError( "could not map user to token because the issuer did not return a 'sub' claim in its token introspection response" ) with transaction.atomic(): openid_user = OpenidUser.objects.get_or_create_for_sub(user_data.sub) user = openid_user.user self.automap_user_attrs(user, user_data) user.save() return user
[docs] def handle_federated_access_token( self, access_token: str, oidc_client: OpenidClient, required_scopes: Union[str, None] = None, ) -> Tuple[UserModel, FederatedUserData]: """ Entry point for dynamically creating or updating user data based on an access token which was provided by a user. This method inspects the token and then calls into :meth:`UserMapper.handle_federated_userinfo()` once more information about the user is available. :param access_token: The raw access token that was passed to this application which should identify the user. :param oidc_client: An OpenID client which is used to access the OpenID providers signing keys or to introspect the token if necessary. :param required_scopes: Scopes to which the access token is required to have access. If ``None`` is passed, the default scopes from django settings ``OPENID_SCOPE`` are used. Pass an empty string if no scopes are required. :returns: An instance of the applications user model as well as additional data about the user. :raises ValidationError: If the passed token cannot be validated or is decidedly invalid. """ if required_scopes is None: required_scopes = OpenidAppConfig.get_instance().safe_settings.OPENID_SCOPE # return cached data if it exists if ( cached_data := self.get_cached_data(access_token, required_scopes) ) is not None: return cached_data # try to parse the raw token as JWT user_data = None # type: JwtAccessToken | TokenIntrospectionSuccessResponse | None try: # parse an validate the general token structure token = JwtAccessToken.parse_jwt( access_token, oidc_client.provider_keys, ) token.validate_extern( oidc_client.provider_config.issuer, oidc_client.client_auth.client_id, ) # validate token scope for required access if required_scopes != "": if token.scope is None: raise ValidationError( "token does not contain required scopes claim" ) elif any( i_scope not in token.scope.split(" ") for i_scope in required_scopes.split(" ") ): raise ValidationError( f"token has access to scopes '{token.scope}' but '{required_scopes}' are required" ) # the token is determined to be valid, so we can use it as user_data user_data = token # fall back to introspecting the token at the issuer except Exception: logger.debug( "could not parse access token as JWT, falling back to calling the providers token introspection endpoint" ) introspect_response = oidc_client.introspect_token(access_token) if isinstance(introspect_response, TokenIntrospectionErrorResponse): logger.critical( "could not introspect token for validity: %s", introspect_response ) raise ValidationError( f"could not introspect token at the issuer: {introspect_response}" ) # fail if the token is expired if not introspect_response.active: raise ValidationError("token is expired") # validate token scope for required access if introspect_response.scope is None: logger.error( "could not determine access token access because the issuer did not return the tokens scope during token instrospection" ) raise ValidationError( "could not determine token scope because the issuer did not return the tokens scope during token introspection" ) elif any( i_scope not in introspect_response.scope.split(" ") for i_scope in required_scopes.split(" ") ): raise ValidationError( f"token has access to scopes '{introspect_response.scope}' but '{required_scopes}' are required" ) # the token is determined to be valid, so we can use it as user_data user_data = introspect_response # call down into the actual user mapping handler user = self.handle_federated_userinfo(user_data) # populate cache and return result self.populate_cache(access_token, required_scopes, user, user_data) return user, user_data
[docs] def automap_user_attrs( self, user: "AbstractBaseUser", user_data: FederatedUserData, ) -> None: """ Inspect the given user instance model, discover its attributes based on some heuristics and set their values from the passed user information. .. note:: ``user.save()`` is not automatically called by this method to allow extending it via class inheritance without causing multiple database operations. :param user: The user instance on which attributes should be set :param user_data: Information about the user which was made available through OpenID. """ if isinstance(user, AbstractUser): # username if hasattr(user_data, "preferred_username"): setattr(user, user.USERNAME_FIELD, user_data.preferred_username) elif hasattr(user_data, "username"): setattr(user, user.USERNAME_FIELD, user_data.username) elif hasattr(user_data, "sub"): setattr(user, user.USERNAME_FIELD, user_data.sub) else: logger.warning( "Could not determine a username from federated user data. Creating more than one user will probably fail because the users username attribute is mapped to be empty and django enforces a unique-constraint on usernames." ) # email if hasattr(user_data, "email"): setattr(user, user.EMAIL_FIELD, user_data.email) # given name if hasattr(user_data, "given_name") and hasattr(user, "first_name"): user.first_name = user_data.given_name # family name if hasattr(user_data, "family_name") and hasattr(user, "last_name"): user.last_name = user_data.family_name
def get_cached_data( self, access_token: str, required_scopes: str ) -> Optional[Tuple[UserModel, FederatedUserData]]: input_hash = sha256(usedforsecurity=True) input_hash.update(access_token.encode("UTF-8")) input_hash.update(required_scopes.encode("UTF-8")) cache_key = f"simple-openid-connect/user-of-token-{input_hash.hexdigest()}" if (cached_data := cache.get(cache_key)) is not None: user_id, federated_user_data = cached_data user = get_user_model().objects.get(id=user_id) return user, federated_user_data return None def populate_cache( self, access_token: str, required_scopes: str, user: UserModel, federated_data: FederatedUserData, ) -> None: input_hash = sha256(usedforsecurity=True) input_hash.update(access_token.encode("UTF-8")) input_hash.update(required_scopes.encode("UTF-8")) cache_key = f"simple-openid-connect/user-of-token-{input_hash.hexdigest()}" cache_timeout = None if exp := getattr(federated_data, "exp", None) is not None: now = int(timezone.now().timestamp()) cache_timeout = max(0, exp - now) cache.set(cache_key, (user.pk, federated_data), timeout=cache_timeout)