(core) create AccountServiceAuthentication backend

Backend authentication with API Key to AccountService
This commit is contained in:
Sabrina Demagny
2025-03-31 14:17:47 +02:00
parent f60bfc2676
commit 855e20d407
4 changed files with 114 additions and 1 deletions

View File

@@ -10,6 +10,7 @@ and this project adheres to
### Added
- ✨(core) create AccountServiceAuthentication backend #771
- ✨(core) create AccountService model #771
- 🧱(helm) disable createsuperuser job by setting #863
- 🔒️(passwords) add validators for production #850

View File

@@ -13,8 +13,11 @@ import requests
from mozilla_django_oidc.auth import (
OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend,
)
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from core.models import (
AccountService,
Contact,
Organization,
OrganizationAccess,
@@ -245,3 +248,45 @@ class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend):
if updated_claims:
self.UserModel.objects.filter(sub=user.sub).update(**updated_claims)
class AccountServiceAuthentication(BaseAuthentication):
"""Authentication backend for account services using Authorization header.
The Authorization header is used to authenticate the request.
The api key is stored in the AccountService model.
Header format:
Authorization: ApiKey <api_key>
"""
def authenticate(self, request):
"""Authenticate the request. Find the account service and check the api key.
Should return either:
- a tuple of (account_service, api_key) if allowed,
- None to pass on other authentication backends
- raise an AuthenticationFailed exception to stop propagation.
"""
auth_header = request.headers.get("Authorization", "")
if not auth_header:
return None
try:
auth_mode, api_key = auth_header.split(" ")
except (IndexError, ValueError) as err:
raise AuthenticationFailed(_("Invalid authorization header.")) from err
if auth_mode.lower() != "apikey" or not api_key:
raise AuthenticationFailed(_("Invalid authorization header."))
try:
account_service = AccountService.objects.get(api_key=api_key)
except AccountService.DoesNotExist as err:
logger.warning("Invalid api_key: %s", api_key)
raise AuthenticationFailed(_("Invalid api key.")) from err
return (account_service, account_service.api_key)
def authenticate_header(self, request):
"""
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response, or `None` if the
authentication scheme should return `403 Permission Denied` responses.
"""
return "apikey"

View File

@@ -1122,3 +1122,8 @@ class AccountService(BaseModel):
def __str__(self):
return self.name
@property
def is_authenticated(self):
"""Indicate if the account service is authenticated."""
return True

View File

@@ -2,11 +2,16 @@
from django.contrib.auth import get_user_model
from django.core.exceptions import SuspiciousOperation
from django.test import RequestFactory, override_settings
import pytest
from rest_framework.exceptions import AuthenticationFailed
from core import factories, models
from core.authentication.backends import OIDCAuthenticationBackend
from core.authentication.backends import (
AccountServiceAuthentication,
OIDCAuthenticationBackend,
)
pytestmark = pytest.mark.django_db
User = get_user_model()
@@ -453,3 +458,60 @@ def test_authentication_getter_existing_user_with_registration_id(
assert user.organization is not None
assert user.organization.registration_id_list == ["12345678901234"]
@override_settings(ACCOUNT_SERVICE_SCOPES=["la-suite-list-organizations-siret"])
def test_account_service_authenticate_valid_api_key():
"""Test the authenticate method with a valid API key."""
request = RequestFactory().get("/")
account_service = factories.AccountServiceFactory(
name="test_service",
api_key="test_api_key_123",
scopes=["la-suite-list-organizations-siret"],
)
request.headers = {"Authorization": f"ApiKey {account_service.api_key}"}
result = AccountServiceAuthentication().authenticate(request)
assert result is not None
assert result[0] == account_service
assert result[1] == account_service.api_key
def test_account_service_authenticate_missing_api_key():
"""Test the authenticate method with a missing API key."""
request = RequestFactory().get("/")
request.headers = {}
result = AccountServiceAuthentication().authenticate(request)
assert result is None
def test_account_service_authenticate_invalid_api_key():
"""Test the authenticate method with an invalid API key."""
request = RequestFactory().get("/")
request.headers = {"Authorization": "ApiKey invalid_key"}
with pytest.raises(AuthenticationFailed):
AccountServiceAuthentication().authenticate(request)
@override_settings(ACCOUNT_SERVICE_SCOPES=["la-suite-list-organizations-siret"])
def test_account_service_authenticate_invalid_header():
"""Test the authenticate method with an invalid header."""
request = RequestFactory().get("/")
account_service = factories.AccountServiceFactory(
name="test_service",
api_key="test_api_key_123",
scopes=["la-suite-list-organizations-siret"],
)
request.headers = {"Authorization": f"Bearer {account_service.api_key}"}
with pytest.raises(AuthenticationFailed):
AccountServiceAuthentication().authenticate(request)
request.headers = {"Authorization": account_service.api_key}
with pytest.raises(AuthenticationFailed):
AccountServiceAuthentication().authenticate(request)