Source code for simple_openid_connect.scripts.show_openid_tokens

#!/usr/bin/env python3
import argparse
import json
import logging
import urllib.parse
from collections import defaultdict
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, HTTPServer
from secrets import token_urlsafe
from typing import Any, Dict, Optional

from simple_openid_connect.client import OpenidClient
from simple_openid_connect.data import (
    TokenErrorResponse,
    TokenSuccessResponse,
)

# initialize a session object (which is very primitive but works)
session = defaultdict(lambda: "")
client = None  # type: Optional[OpenidClient]


[docs] class RequestHandler(BaseHTTPRequestHandler): def do_GET(self) -> None: url = urllib.parse.urlparse(self.path) if url.path == "/": self._handle_initial() elif url.path.startswith("/callback"): self._handle_callback() else: self.return_redirect("/") def _handle_initial(self) -> None: global session global client assert client is not None # setup authorization code flow session["state"] = token_urlsafe(16) session["nonce"] = token_urlsafe(16) login_url = client.authorization_code_flow.start_authentication( state=session["state"], nonce=session["nonce"] ) # send response self.return_redirect(login_url) def _handle_callback(self) -> None: global session global client assert client is not None # parse callback response = client.authorization_code_flow.handle_authentication_result( self.path, state=session["state"] ) match response: case TokenSuccessResponse(): access_token = response.access_token id_token = client.decode_id_token( response.id_token, nonce=session["nonce"] ) userinfo = client.fetch_userinfo(access_token) self.return_json_response( { "token_response": response.model_dump(), "id_token": id_token.model_dump(), "userinfo": userinfo.model_dump(), } ) case TokenErrorResponse(): self.return_json_response(response.model_dump()) def return_redirect(self, to: str, code: int = HTTPStatus.FOUND) -> None: self.send_response(code) self.send_header("location", to) self.end_headers() def return_json_response(self, content: Dict[Any, Any]) -> None: self.send_response(HTTPStatus.OK) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(content).encode("UTF-8"))
def main() -> None: argp = argparse.ArgumentParser( description="Script to retrieve access and refresh tokens from an OpenID-Connect identity provider and display them to the user. Useful for debugging purposes when requiring low-level access to these tokens." ) argp.add_argument( "--issuer", required=True, help="OpenID_Connect issuer url", ) argp.add_argument( "--client-id", required=True, help="OpenID-Connect client id", ) argp.add_argument( "--client-secret", required=True, help="OpenID-Connect client secret" ) argp.add_argument( "--scope", help="OpenID-Connect scopes to request. Default to just 'openid'. Can be given multiple times, once for each additional scope", action="append", default=["openid"], ) argp.add_argument( "-p", "--port", type=int, default=8080, help="Port to bind this server to. Defaults to 8080", ) argp.add_argument( "--host", default="127.0.0.1", help="IP address to bind this server to. Defaults to 127.0.0.1", ) args = argp.parse_args() logging.basicConfig(level=logging.INFO) # initialize openid client global client own_base_url = f"http://{args.host}:{args.port}" client = OpenidClient.from_issuer_url( args.issuer, authentication_redirect_uri=own_base_url + "/callback", client_id=args.client_id, client_secret=args.client_secret, scope=" ".join(args.scope), ) # serve a basic http server so that authorization code flow can be used with HTTPServer((args.host, args.port), RequestHandler) as server: print(f"Open http://{server.server_name}:{server.server_port}") try: server.serve_forever() except KeyboardInterrupt: pass if __name__ == "__main__": main()