Source code for simple_openid_connect.integrations.django.user_mapping

"""
Default implementations for mapping tokens to user objects.

This implementation can be overriden by extending the :class:`UserMapper` class and then setting the django settings
variable ``OPENID_USER_MAPPER`` to an import string pointing to the newly created class.
"""
import logging
from typing import Any, Tuple, Union

from django.contrib.auth.models import AbstractBaseUser, AbstractUser
from django.db import transaction

from simple_openid_connect.client import OpenidClient
from simple_openid_connect.data import (
    IdToken,
    JwtAccessToken,
    TokenIntrospectionErrorResponse,
    TokenIntrospectionSuccessResponse,
    UserinfoSuccessResponse,
)
from simple_openid_connect.exceptions import ValidationError
from simple_openid_connect.integrations.django.apps import OpenidAppConfig
from simple_openid_connect.integrations.django.models import OpenidUser

logger = logging.getLogger(__name__)

FederatedUserData = Union[
    IdToken, UserinfoSuccessResponse, TokenIntrospectionSuccessResponse, JwtAccessToken
]
"Type alias for the different classes which can provide information about a federated user."


[docs] class UserMapper: """ A base class which is responsible for mapping federated users into the local system. """
[docs] def handle_federated_userinfo(self, user_data: FederatedUserData) -> Any: """ Entry point for dynamically creating or updating user data based on information obtained through OpenID. The function automatically creates a new user model instance if the user is unknown or updates the locally stored user information based on the federated data. :param user_data: Information about the user. :returns: An instance of the applications user model. """ # validate that user data contains at least a user id if user_data.sub is None: raise ValidationError( "could not map user to token because the issuer did not return a 'sub' claim in its token introspection response" ) with transaction.atomic(): openid_user = OpenidUser.objects.get_or_create_for_sub(user_data.sub) user = openid_user.user self.automap_user_attrs(user, user_data) user.save() return user
[docs] def handle_federated_access_token( self, access_token: str, oidc_client: OpenidClient, required_scopes: Union[str, None] = None, ) -> Tuple[Any, FederatedUserData]: """ Entry point for dynamically creating or updating user data based on an access token which was provided by a user. This method inspects the token and then calls into :meth:`UserMapper.handle_federated_userinfo()` once more information about the user is available. :param access_token: The raw access token that was passed to this application which should identify the user. :param oidc_client: An OpenID client which is used to access the OpenID providers signing keys or to introspect the token if necessary. :param required_scopes: Scopes to which the access token is required to have access. If ``None`` is passed, the default scopes from django settings ``OPENID_SCOPE`` are used. Pass an empty string if no scopes are required. :returns: An instance of the applications user model as well as additional data about the user. :raises ValidationError: If the passed token cannot be validated or is decidedly invalid. """ if required_scopes is None: required_scopes = OpenidAppConfig.get_instance().safe_settings.OPENID_SCOPE # try to parse the raw token as JWT user_data = ( None ) # type: JwtAccessToken | TokenIntrospectionSuccessResponse | None try: # parse an validate the general token structure token = JwtAccessToken.parse_jwt( access_token, oidc_client.provider_keys, ) token.validate_extern(oidc_client.provider_config.issuer) # validate token scope for required access if required_scopes != "": if token.scope is None: raise ValidationError( "token does not contain required scopes claim" ) elif any( i_scope not in token.scope.split(" ") for i_scope in required_scopes.split(" ") ): raise ValidationError( f"token has access to scopes '{token.scope}' but '{required_scopes}' are required" ) # the token is determined to be valid, so we can use it as user_data user_data = token # fall back to introspecting the token at the issuer except Exception: logger.debug( "could not parse access token as JWT, falling back to calling the providers token introspection endpoint" ) introspect_response = oidc_client.introspect_token(access_token) if isinstance(introspect_response, TokenIntrospectionErrorResponse): logger.critical( "could not introspect token for validity: %s", introspect_response ) raise ValidationError( f"could not introspect token at the issuer: {introspect_response}" ) # fail if the token is expired if not introspect_response.active: raise ValidationError("token is expired") # validate token scope for required access if introspect_response.scope is None: logger.error( "could not determine access token access because the issuer did not return the tokens scope during token instrospection" ) raise ValidationError( "could not determine token scope because the issuer did not return the tokens scope during token introspection" ) elif any( i_scope not in introspect_response.scope.split(" ") for i_scope in required_scopes.split(" ") ): raise ValidationError( f"token has access to scopes '{introspect_response.scope}' but '{required_scopes}' are required" ) # the token is determined to be valid, so we can use it as user_data user_data = introspect_response return self.handle_federated_userinfo(user_data), user_data
[docs] def automap_user_attrs( self, user: "AbstractBaseUser", user_data: FederatedUserData, ) -> None: """ Inspect the given user instance model, discover its attributes based on some heuristics and set their values from the passed user information. .. note:: ``user.save()`` is not automatically called by this method to allow extending it via class inheritance without causing multiple database operations. :param user: The user instance on which attributes should be set :param user_data: Information about the user which was made available through OpenID. """ if isinstance(user, AbstractUser): # username if hasattr(user_data, "preferred_username"): setattr(user, user.USERNAME_FIELD, user_data.preferred_username) elif hasattr(user_data, "username"): setattr(user, user.USERNAME_FIELD, user_data.username) elif hasattr(user_data, "sub"): setattr(user, user.USERNAME_FIELD, user_data.sub) else: logger.warning( "Could not determine a username from federated user data. Creating more than one user will probably fail because the users username attribute is mapped to be empty and django enforces a unique-constraint on usernames." ) # email if hasattr(user_data, "email"): setattr(user, user.EMAIL_FIELD, user_data.email) # given name if hasattr(user_data, "given_name") and hasattr(user, "first_name"): user.first_name = user_data.given_name # family name if hasattr(user_data, "family_name") and hasattr(user, "last_name"): user.last_name = user_data.family_name