(backend) introduce an authorization server client

In OAuth 2.0, the Authorization Server is equivalent to the OIDC provider.

The Authorization Server exposes endpoints for token introspection and JWKS.
I’ve created a client to easily interact with the Authorization Server,
while doing the token introspection in our resource server.

Token introspection will be explained in upcoming commits.

The current OIDC library used in the project doesn’t offer token introspection,
leading to redundancy in the code handling some OIDC/OAuth2 flows.

This overlap makes the code bulky. My goal is to quickly deliver a working
PoC for the resource server, with plans to refactor in the longer run.

Please feel free to provide feedback on the class design.
This commit is contained in:
lebaudantoine
2024-07-28 23:28:21 +02:00
committed by aleb_the_flash
parent 21371dbd1b
commit 9c05167d80
3 changed files with 277 additions and 0 deletions

View File

@@ -0,0 +1,87 @@
"""Resource Server Clients classes"""
import requests
from joserfc.jwk import KeySet
class AuthorizationServerClient:
"""Client for interacting with an OAuth 2.0 authorization server.
An authorization server issues access tokens to client applications after authenticating
and obtaining authorization from the resource owner. It also provides endpoints for token
introspection and JSON Web Key Sets (JWKS) to validate and decode tokens.
This client facilitates communication with the authorization server, including:
- Fetching token introspection responses.
- Fetching JSON Web Key Sets (JWKS) for token validation.
- Setting appropriate headers for secure communication as recommended by RFC drafts.
"""
# ruff: noqa: PLR0913
# pylint: disable=too-many-arguments
def __init__(
self,
url,
url_jwks,
url_introspection,
verify_ssl,
timeout,
proxy,
):
self.url = url
self._url_introspection = url_introspection
self._url_jwks = url_jwks
self._verify_ssl = verify_ssl
self._timeout = timeout
self._proxy = proxy
@property
def _introspection_headers(self):
"""Get HTTP header for the introspection request.
Notify the authorization server that we expect a signed and encrypted response
by setting the appropriate 'Accept' header.
This follows the recommendation from the draft RFC:
https://datatracker.ietf.org/doc/html/draft-ietf-oauth-jwt-introspection-response-12.
"""
return {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/token-introspection+jwt",
}
def get_introspection(self, client_id, client_secret, token):
"""Retrieve introspection response about a token."""
response = requests.post(
self._url_introspection,
data={
"client_id": client_id,
"client_secret": client_secret,
"token": token,
},
headers=self._introspection_headers,
verify=self._verify_ssl,
timeout=self._timeout,
proxies=self._proxy,
)
response.raise_for_status()
return response.text
def get_jwks(self):
"""Retrieve Authorization Server JWKS."""
response = requests.get(
self._url_jwks,
verify=self._verify_ssl,
timeout=self._timeout,
proxies=self._proxy,
)
response.raise_for_status()
return response.json()
def import_public_keys(self):
"""Retrieve and import Authorization Server JWKS."""
jwks = self.get_jwks()
public_keys = KeySet.import_key_set(jwks)
return public_keys