"""
Default implementations for mapping tokens to user objects.
This implementation can be overriden by extending the :class:`UserMapper` class and then setting the django settings
variable ``OPENID_USER_MAPPER`` to an import string pointing to the newly created class.
"""
import logging
from hashlib import sha256
from typing import Any, Tuple, Union, Optional
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractBaseUser, AbstractUser
from django.core.cache import cache
from django.db import transaction
from django.utils import timezone
from simple_openid_connect.client import OpenidClient
from simple_openid_connect.data import (
IdToken,
JwtAccessToken,
TokenIntrospectionErrorResponse,
TokenIntrospectionSuccessResponse,
UserinfoSuccessResponse,
)
from simple_openid_connect.exceptions import ValidationError
from simple_openid_connect.integrations.django.apps import OpenidAppConfig
from simple_openid_connect.integrations.django.models import OpenidUser
logger = logging.getLogger(__name__)
FederatedUserData = Union[
IdToken, UserinfoSuccessResponse, TokenIntrospectionSuccessResponse, JwtAccessToken
]
"Type alias for the different classes which can provide information about a federated user."
UserModel = Any
[docs]
class UserMapper:
"""
A base class which is responsible for mapping federated users into the local system.
"""
[docs]
def handle_federated_userinfo(self, user_data: FederatedUserData) -> UserModel:
"""
Entry point for dynamically creating or updating user data based on information obtained through OpenID.
The function automatically creates a new user model instance if the user is unknown or updates the locally
stored user information based on the federated data.
:param user_data: Information about the user.
:returns: An instance of the applications user model.
"""
# validate that user data contains at least a user id
if user_data.sub is None:
raise ValidationError(
"could not map user to token because the issuer did not return a 'sub' claim in its token introspection response"
)
with transaction.atomic():
openid_user = OpenidUser.objects.get_or_create_for_sub(user_data.sub)
user = openid_user.user
self.automap_user_attrs(user, user_data)
user.save()
return user
[docs]
def handle_federated_access_token(
self,
access_token: str,
oidc_client: OpenidClient,
required_scopes: Union[str, None] = None,
) -> Tuple[UserModel, FederatedUserData]:
"""
Entry point for dynamically creating or updating user data based on an access token which was provided by a user.
This method inspects the token and then calls into :meth:`UserMapper.handle_federated_userinfo()` once more information
about the user is available.
:param access_token: The raw access token that was passed to this application which should identify the user.
:param oidc_client: An OpenID client which is used to access the OpenID providers signing keys or to introspect the token if necessary.
:param required_scopes: Scopes to which the access token is required to have access.
If ``None`` is passed, the default scopes from django settings ``OPENID_SCOPE`` are used.
Pass an empty string if no scopes are required.
:returns: An instance of the applications user model as well as additional data about the user.
:raises ValidationError: If the passed token cannot be validated or is decidedly invalid.
"""
if required_scopes is None:
required_scopes = OpenidAppConfig.get_instance().safe_settings.OPENID_SCOPE
# return cached data if it exists
if (
cached_data := self.get_cached_data(access_token, required_scopes)
) is not None:
return cached_data
# try to parse the raw token as JWT
user_data = None # type: JwtAccessToken | TokenIntrospectionSuccessResponse | None
try:
# parse an validate the general token structure
token = JwtAccessToken.parse_jwt(
access_token,
oidc_client.provider_keys,
)
token.validate_extern(
oidc_client.provider_config.issuer,
oidc_client.client_auth.client_id,
)
# validate token scope for required access
if required_scopes != "":
if token.scope is None:
raise ValidationError(
"token does not contain required scopes claim"
)
elif any(
i_scope not in token.scope.split(" ")
for i_scope in required_scopes.split(" ")
):
raise ValidationError(
f"token has access to scopes '{token.scope}' but '{required_scopes}' are required"
)
# the token is determined to be valid, so we can use it as user_data
user_data = token
# fall back to introspecting the token at the issuer
except Exception:
logger.debug(
"could not parse access token as JWT, falling back to calling the providers token introspection endpoint"
)
introspect_response = oidc_client.introspect_token(access_token)
if isinstance(introspect_response, TokenIntrospectionErrorResponse):
logger.critical(
"could not introspect token for validity: %s", introspect_response
)
raise ValidationError(
f"could not introspect token at the issuer: {introspect_response}"
)
# fail if the token is expired
if not introspect_response.active:
raise ValidationError("token is expired")
# validate token scope for required access
if introspect_response.scope is None:
logger.error(
"could not determine access token access because the issuer did not return the tokens scope during token instrospection"
)
raise ValidationError(
"could not determine token scope because the issuer did not return the tokens scope during token introspection"
)
elif any(
i_scope not in introspect_response.scope.split(" ")
for i_scope in required_scopes.split(" ")
):
raise ValidationError(
f"token has access to scopes '{introspect_response.scope}' but '{required_scopes}' are required"
)
# the token is determined to be valid, so we can use it as user_data
user_data = introspect_response
# call down into the actual user mapping handler
user = self.handle_federated_userinfo(user_data)
# populate cache and return result
self.populate_cache(access_token, required_scopes, user, user_data)
return user, user_data
[docs]
def automap_user_attrs(
self,
user: "AbstractBaseUser",
user_data: FederatedUserData,
) -> None:
"""
Inspect the given user instance model, discover its attributes based on some heuristics and set their values
from the passed user information.
.. note::
``user.save()`` is not automatically called by this method to allow extending it via class inheritance
without causing multiple database operations.
:param user: The user instance on which attributes should be set
:param user_data: Information about the user which was made available through OpenID.
"""
if isinstance(user, AbstractUser):
# username
if hasattr(user_data, "preferred_username"):
setattr(user, user.USERNAME_FIELD, user_data.preferred_username)
elif hasattr(user_data, "username"):
setattr(user, user.USERNAME_FIELD, user_data.username)
elif hasattr(user_data, "sub"):
setattr(user, user.USERNAME_FIELD, user_data.sub)
else:
logger.warning(
"Could not determine a username from federated user data. Creating more than one user will probably fail because the users username attribute is mapped to be empty and django enforces a unique-constraint on usernames."
)
# email
if hasattr(user_data, "email"):
setattr(user, user.EMAIL_FIELD, user_data.email)
# given name
if hasattr(user_data, "given_name") and hasattr(user, "first_name"):
user.first_name = user_data.given_name
# family name
if hasattr(user_data, "family_name") and hasattr(user, "last_name"):
user.last_name = user_data.family_name
def get_cached_data(
self, access_token: str, required_scopes: str
) -> Optional[Tuple[UserModel, FederatedUserData]]:
input_hash = sha256(usedforsecurity=True)
input_hash.update(access_token.encode("UTF-8"))
input_hash.update(required_scopes.encode("UTF-8"))
cache_key = f"simple-openid-connect/user-of-token-{input_hash.hexdigest()}"
if (cached_data := cache.get(cache_key)) is not None:
user_id, federated_user_data = cached_data
user = get_user_model().objects.get(id=user_id)
return user, federated_user_data
return None
def populate_cache(
self,
access_token: str,
required_scopes: str,
user: UserModel,
federated_data: FederatedUserData,
) -> None:
input_hash = sha256(usedforsecurity=True)
input_hash.update(access_token.encode("UTF-8"))
input_hash.update(required_scopes.encode("UTF-8"))
cache_key = f"simple-openid-connect/user-of-token-{input_hash.hexdigest()}"
cache_timeout = None
if exp := getattr(federated_data, "exp", None) is not None:
now = int(timezone.now().timestamp())
cache_timeout = max(0, exp - now)
cache.set(cache_key, (user.pk, federated_data), timeout=cache_timeout)