(webhook) add webhook logic and synchronization utils

adding webhooks logic to send serialized team memberships data
to a designated serie of webhooks.
This commit is contained in:
Marie PUPO JEAMMET
2024-03-22 14:47:08 +01:00
committed by Marie
parent 7ea6342a01
commit ebf58f42c9
13 changed files with 813 additions and 9 deletions

View File

@@ -53,6 +53,15 @@ class TeamAccessInline(admin.TabularInline):
readonly_fields = ("created_at", "updated_at")
class TeamWebhookInline(admin.TabularInline):
"""Inline admin class for team webhooks."""
extra = 0
autocomplete_fields = ["team"]
model = models.TeamWebhook
readonly_fields = ("created_at", "updated_at")
@admin.register(models.User)
class UserAdmin(auth_admin.UserAdmin):
"""Admin class for the User model"""
@@ -112,7 +121,7 @@ class UserAdmin(auth_admin.UserAdmin):
class TeamAdmin(admin.ModelAdmin):
"""Team admin interface declaration."""
inlines = (TeamAccessInline,)
inlines = (TeamAccessInline, TeamWebhookInline)
list_display = (
"name",
"slug",

View File

@@ -3,6 +3,7 @@ Core application enums declaration
"""
from django.conf import global_settings, settings
from django.db import models
from django.utils.translation import gettext_lazy as _
# Django sets `LANGUAGES` by default with all supported languages. We can use it for
@@ -14,3 +15,11 @@ ALL_LANGUAGES = getattr(
"ALL_LANGUAGES",
[(language, _(name)) for language, name in global_settings.LANGUAGES],
)
class WebhookStatusChoices(models.TextChoices):
"""Defines the possible statuses in which a webhook can be."""
FAILURE = "failure", _("Failure")
PENDING = "pending", _("Pending")
SUCCESS = "success", _("Success")

View File

@@ -177,6 +177,16 @@ class TeamAccessFactory(factory.django.DjangoModelFactory):
role = factory.fuzzy.FuzzyChoice([r[0] for r in models.RoleChoices.choices])
class TeamWebhookFactory(factory.django.DjangoModelFactory):
"""Create fake team webhooks for testing."""
class Meta:
model = models.TeamWebhook
team = factory.SubFactory(TeamFactory)
url = factory.Sequence(lambda n: f"https://example.com/Groups/{n!s}")
class InvitationFactory(factory.django.DjangoModelFactory):
"""A factory to create invitations for a user"""

View File

@@ -1,4 +1,4 @@
# Generated by Django 5.0.2 on 2024-03-05 17:09
# Generated by Django 5.0.3 on 2024-03-25 22:58
import django.contrib.auth.models
import django.core.validators
@@ -45,7 +45,7 @@ class Migration(migrations.Migration):
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated at')),
('email', models.EmailField(blank=True, max_length=254, null=True, unique=True, verbose_name='email address')),
('admin_email', models.EmailField(blank=True, max_length=254, null=True, unique=True, verbose_name='admin email address')),
('language', models.CharField(choices="(('en-us', 'English'), ('fr-fr', 'French'))", default='en-us', help_text='The language in which the user wants to see the interface.', max_length=10, verbose_name='language')),
('timezone', timezone_field.fields.TimeZoneField(choices_display='WITH_GMT_OFFSET', default='UTC', help_text='The timezone in which the user wants to see times.', use_pytz=False)),
('is_device', models.BooleanField(default=False, help_text='Whether the user is a device or a real user.', verbose_name='device')),
@@ -144,6 +144,23 @@ class Migration(migrations.Migration):
name='users',
field=models.ManyToManyField(related_name='teams', through='core.TeamAccess', to=settings.AUTH_USER_MODEL),
),
migrations.CreateModel(
name='TeamWebhook',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, help_text='primary key for the record as UUID', primary_key=True, serialize=False, verbose_name='id')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='date and time at which a record was created', verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, help_text='date and time at which a record was last updated', verbose_name='updated at')),
('url', models.URLField(verbose_name='url')),
('secret', models.CharField(blank=True, max_length=255, null=True, verbose_name='secret')),
('status', models.CharField(choices=[('failure', 'Failure'), ('pending', 'Pending'), ('success', 'Success')], default='pending', max_length=10)),
('team', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='webhooks', to='core.team')),
],
options={
'verbose_name': 'Team webhook',
'verbose_name_plural': 'Team webhooks',
'db_table': 'people_team_webhook',
},
),
migrations.AddConstraint(
model_name='contact',
constraint=models.CheckConstraint(check=models.Q(('base__isnull', False), ('owner__isnull', True), _negated=True), name='base_owner_constraint', violation_error_message='A contact overriding a base contact must be owned.'),

View File

@@ -13,7 +13,7 @@ from django.conf import settings
from django.contrib.auth import models as auth_models
from django.contrib.auth.base_user import AbstractBaseUser
from django.core import exceptions, mail, validators
from django.db import models
from django.db import models, transaction
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.functional import lazy
@@ -24,6 +24,9 @@ from django.utils.translation import override
import jsonschema
from timezone_field import TimeZoneField
from core.enums import WebhookStatusChoices
from core.utils.webhooks import scim_synchronizer
logger = getLogger(__name__)
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -462,6 +465,34 @@ class TeamAccess(BaseModel):
def __str__(self):
return f"{self.user!s} is {self.role:s} in team {self.team!s}"
def save(self, *args, **kwargs):
"""
Override save function to fire webhooks on any addition or update
to a team access.
"""
if self._state.adding:
self.team.webhooks.update(status=WebhookStatusChoices.PENDING)
with transaction.atomic():
instance = super().save(*args, **kwargs)
scim_synchronizer.add_user_to_group(self.team, self.user)
else:
instance = super().save(*args, **kwargs)
return instance
def delete(self, *args, **kwargs):
"""
Override delete method to fire webhooks on to team accesses.
Don't allow deleting a team access until it is successfully synchronized with all
its webhooks.
"""
self.team.webhooks.update(status=WebhookStatusChoices.PENDING)
with transaction.atomic():
arguments = self.team, self.user
super().delete(*args, **kwargs)
scim_synchronizer.remove_user_from_group(*arguments)
def get_abilities(self, user):
"""
Compute and return abilities for a given user taking into account
@@ -512,6 +543,34 @@ class TeamAccess(BaseModel):
}
class TeamWebhook(BaseModel):
"""Webhooks fired on changes in teams."""
team = models.ForeignKey(Team, related_name="webhooks", on_delete=models.CASCADE)
url = models.URLField(_("url"))
secret = models.CharField(_("secret"), max_length=255, null=True, blank=True)
status = models.CharField(
max_length=10,
default=WebhookStatusChoices.PENDING,
choices=WebhookStatusChoices.choices,
)
class Meta:
db_table = "people_team_webhook"
verbose_name = _("Team webhook")
verbose_name_plural = _("Team webhooks")
def __str__(self):
return f"Webhook to {self.url} for {self.team}"
def get_headers(self):
"""Build header dict from webhook object."""
headers = {"Content-Type": "application/json"}
if self.secret:
headers["Authorization"] = f"Bearer {self.secret:s}"
return headers
class Invitation(BaseModel):
"""User invitation to teams."""

View File

@@ -2,9 +2,12 @@
Test for team accesses API endpoints in People's core app : create
"""
import json
import random
import re
import pytest
import responses
from rest_framework.test import APIClient
from core import factories, models
@@ -174,3 +177,60 @@ def test_api_team_accesses_create_authenticated_owner():
"role": role,
"user": str(other_user.id),
}
def test_api_team_accesses_create_webhook():
"""
When the team has a webhook, creating a team access should fire a call.
"""
user, other_user = factories.UserFactory.create_batch(2)
team = factories.TeamFactory(users=[(user, "owner")])
webhook = factories.TeamWebhookFactory(team=team)
role = random.choice([role[0] for role in models.RoleChoices.choices])
client = APIClient()
client.force_login(user)
with responses.RequestsMock() as rsps:
# Ensure successful response by scim provider using "responses":
rsp = rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=200,
content_type="application/json",
)
response = client.post(
f"/api/v1.0/teams/{team.id!s}/accesses/",
{
"user": str(other_user.id),
"role": role,
},
format="json",
)
assert response.status_code == 201
assert rsp.call_count == 1
assert rsps.calls[0].request.url == webhook.url
# Payload sent to scim provider
payload = json.loads(rsps.calls[0].request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "add",
"path": "members",
"value": [
{
"value": str(other_user.id),
"email": None,
"type": "User",
}
],
}
],
}

View File

@@ -2,9 +2,12 @@
Test for team accesses API endpoints in People's core app : delete
"""
import json
import random
import re
import pytest
import responses
from rest_framework.test import APIClient
from core import factories, models
@@ -148,18 +151,77 @@ def test_api_team_accesses_delete_owners_last_owner():
"""
It should not be possible to delete the last owner access from a team
"""
identity = factories.IdentityFactory()
user = identity.user
user = factories.UserFactory()
team = factories.TeamFactory()
access = factories.TeamAccessFactory(team=team, user=user, role="owner")
assert models.TeamAccess.objects.count() == 1
client = APIClient()
client.force_login(user)
response = client.delete(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
)
assert response.status_code == 403
assert models.TeamAccess.objects.count() == 1
def test_api_team_accesses_delete_webhook():
"""
When the team has a webhook, deleting a team access should fire a call.
"""
user = factories.UserFactory()
team = factories.TeamFactory(users=[(user, "administrator")])
webhook = factories.TeamWebhookFactory(team=team)
access = factories.TeamAccessFactory(
team=team, role=random.choice(["member", "administrator"])
)
assert models.TeamAccess.objects.count() == 2
assert models.TeamAccess.objects.filter(user=access.user).exists()
client = APIClient()
client.force_login(user)
with responses.RequestsMock() as rsps:
# Ensure successful response by scim provider using "responses":
rsp = rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=200,
content_type="application/json",
)
response = client.delete(
f"/api/v1.0/teams/{team.id!s}/accesses/{access.id!s}/",
)
assert response.status_code == 204
assert rsp.call_count == 1
assert rsps.calls[0].request.url == webhook.url
# Payload sent to scim provider
payload = json.loads(rsps.calls[0].request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "remove",
"path": "members",
"value": [
{
"value": str(access.user.id),
"email": None,
"type": "User",
}
],
}
],
}
assert models.TeamAccess.objects.count() == 1
assert models.TeamAccess.objects.filter(user=access.user).exists() is False

View File

@@ -2,12 +2,16 @@
Unit tests for the TeamAccess model
"""
import json
import re
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ValidationError
import pytest
import responses
from core import factories
from core import factories, models
pytestmark = pytest.mark.django_db
@@ -39,6 +43,94 @@ def test_models_team_accesses_unique():
factories.TeamAccessFactory(user=access.user, team=access.team)
def test_models_team_accesses_create_webhook():
"""
When the team has a webhook, creating a team access should fire a call.
"""
user = factories.UserFactory()
team = factories.TeamFactory()
webhook = factories.TeamWebhookFactory(team=team)
with responses.RequestsMock() as rsps:
# Ensure successful response by scim provider using "responses":
rsp = rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=200,
content_type="application/json",
)
models.TeamAccess.objects.create(user=user, team=team)
assert rsp.call_count == 1
assert rsps.calls[0].request.url == webhook.url
# Payload sent to scim provider
payload = json.loads(rsps.calls[0].request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "add",
"path": "members",
"value": [
{
"value": str(user.id),
"email": None,
"type": "User",
}
],
}
],
}
def test_models_team_accesses_delete_webhook():
"""
When the team has a webhook, deleting a team access should fire a call.
"""
team = factories.TeamFactory()
webhook = factories.TeamWebhookFactory(team=team)
access = factories.TeamAccessFactory(team=team)
with responses.RequestsMock() as rsps:
# Ensure successful response by scim provider using "responses":
rsp = rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=200,
content_type="application/json",
)
access.delete()
assert rsp.call_count == 1
assert rsps.calls[0].request.url == webhook.url
# Payload sent to scim provider
payload = json.loads(rsps.calls[0].request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "remove",
"path": "members",
"value": [
{
"value": str(access.user.id),
"email": None,
"type": "User",
}
],
}
],
}
assert models.TeamAccess.objects.exists() is False
# get_abilities

View File

@@ -0,0 +1,341 @@
"""Test Team synchronization webhooks."""
import json
import random
import re
from logging import Logger
from unittest import mock
import pytest
import responses
from core import factories
from core.utils.webhooks import scim_synchronizer
pytestmark = pytest.mark.django_db
def test_utils_webhooks_add_user_to_group_no_webhooks():
"""If no webhook is declared on the team, the function should not make any request."""
access = factories.TeamAccessFactory()
with responses.RequestsMock():
scim_synchronizer.add_user_to_group(access.team, access.user)
assert len(responses.calls) == 0
@mock.patch.object(Logger, "info")
def test_utils_webhooks_add_user_to_group_success(mock_info):
"""The user passed to the function should get added."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
webhooks = factories.TeamWebhookFactory.create_batch(2, team=access.team)
with responses.RequestsMock() as rsps:
# Ensure successful response by scim provider using "responses":
rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=200,
content_type="application/json",
)
scim_synchronizer.add_user_to_group(access.team, access.user)
for i, webhook in enumerate(webhooks):
assert rsps.calls[i].request.url == webhook.url
# Check headers
headers = rsps.calls[i].request.headers
assert "Authorization" not in headers
assert headers["Content-Type"] == "application/json"
# Payload sent to scim provider
for call in rsps.calls:
payload = json.loads(call.request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "add",
"path": "members",
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"type": "User",
}
],
}
],
}
# Logger
assert mock_info.call_count == 2
for i, webhook in enumerate(webhooks):
assert mock_info.call_args_list[i][0] == (
"%s synchronization succeeded with %s",
"add_user_to_group",
webhook.url,
)
# Status
for webhook in webhooks:
webhook.refresh_from_db()
assert webhook.status == "success"
@mock.patch.object(Logger, "info")
def test_utils_webhooks_remove_user_from_group_success(mock_info):
"""The user passed to the function should get removed."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
webhooks = factories.TeamWebhookFactory.create_batch(2, team=access.team)
with responses.RequestsMock() as rsps:
# Ensure successful response by scim provider using "responses":
rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=200,
content_type="application/json",
)
scim_synchronizer.remove_user_from_group(access.team, access.user)
for i, webhook in enumerate(webhooks):
assert rsps.calls[i].request.url == webhook.url
# Payload sent to scim provider
for call in rsps.calls:
payload = json.loads(call.request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "remove",
"path": "members",
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"type": "User",
}
],
}
],
}
# Logger
assert mock_info.call_count == 2
for i, webhook in enumerate(webhooks):
assert mock_info.call_args_list[i][0] == (
"%s synchronization succeeded with %s",
"remove_user_from_group",
webhook.url,
)
# Status
for webhook in webhooks:
webhook.refresh_from_db()
assert webhook.status == "success"
@mock.patch.object(Logger, "error")
@mock.patch.object(Logger, "info")
def test_utils_webhooks_add_user_to_group_failure(mock_info, mock_error):
"""The logger should be called on webhook call failure."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
webhooks = factories.TeamWebhookFactory.create_batch(2, team=access.team)
with responses.RequestsMock() as rsps:
# Simulate webhook failure using "responses":
rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=random.choice([404, 301, 302]),
content_type="application/json",
)
scim_synchronizer.add_user_to_group(access.team, access.user)
for i, webhook in enumerate(webhooks):
assert rsps.calls[i].request.url == webhook.url
# Payload sent to scim provider
for call in rsps.calls:
payload = json.loads(call.request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "add",
"path": "members",
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"type": "User",
}
],
}
],
}
# Logger
assert not mock_info.called
assert mock_error.call_count == 2
for i, webhook in enumerate(webhooks):
assert mock_error.call_args_list[i][0] == (
"%s synchronization failed with %s",
"add_user_to_group",
webhook.url,
)
# Status
for webhook in webhooks:
webhook.refresh_from_db()
assert webhook.status == "failure"
@mock.patch.object(Logger, "error")
@mock.patch.object(Logger, "info")
def test_utils_webhooks_add_user_to_group_retries(mock_info, mock_error):
"""webhooks synchronization supports retries."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
webhook = factories.TeamWebhookFactory(team=access.team)
url = re.compile(r".*/Groups/.*")
with responses.RequestsMock() as rsps:
# Make webhook fail 3 times before succeeding using "responses"
all_rsps = [
rsps.add(rsps.PATCH, url, status=500, content_type="application/json"),
rsps.add(rsps.PATCH, url, status=500, content_type="application/json"),
rsps.add(rsps.PATCH, url, status=500, content_type="application/json"),
rsps.add(rsps.PATCH, url, status=200, content_type="application/json"),
]
scim_synchronizer.add_user_to_group(access.team, access.user)
for i in range(4):
assert all_rsps[i].call_count == 1
assert rsps.calls[i].request.url == webhook.url
payload = json.loads(rsps.calls[i].request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "add",
"path": "members",
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"type": "User",
}
],
}
],
}
# Logger
assert not mock_error.called
assert mock_info.call_count == 1
assert mock_info.call_args_list[0][0] == (
"%s synchronization succeeded with %s",
"add_user_to_group",
webhook.url,
)
# Status
webhook.refresh_from_db()
assert webhook.status == "success"
@mock.patch.object(Logger, "error")
@mock.patch.object(Logger, "info")
def test_utils_synchronize_course_runs_max_retries_exceeded(mock_info, mock_error):
"""Webhooks synchronization has exceeded max retries and should get logged."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
webhook = factories.TeamWebhookFactory(team=access.team)
with responses.RequestsMock() as rsps:
# Simulate webhook temporary failure using "responses":
rsp = rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=random.choice([500, 502]),
content_type="application/json",
)
scim_synchronizer.add_user_to_group(access.team, access.user)
assert rsp.call_count == 5
assert rsps.calls[0].request.url == webhook.url
payload = json.loads(rsps.calls[0].request.body)
assert payload == {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "add",
"path": "members",
"value": [
{
"value": str(access.user.id),
"email": identity.email,
"type": "User",
}
],
}
],
}
# Logger
assert not mock_info.called
assert mock_error.call_count == 1
assert mock_error.call_args_list[0][0] == (
"%s synchronization failed due to max retries exceeded with url %s",
"add_user_to_group",
webhook.url,
)
# Status
webhook.refresh_from_db()
assert webhook.status == "failure"
def test_utils_webhooks_add_user_to_group_authorization():
"""Secret token should be passed in authorization header when set."""
identity = factories.IdentityFactory()
access = factories.TeamAccessFactory(user=identity.user)
webhook = factories.TeamWebhookFactory(team=access.team, secret="123")
with responses.RequestsMock() as rsps:
# Ensure successful response by scim provider using "responses":
rsps.add(
rsps.PATCH,
re.compile(r".*/Groups/.*"),
body="{}",
status=200,
content_type="application/json",
)
scim_synchronizer.add_user_to_group(access.team, access.user)
assert rsps.calls[0].request.url == webhook.url
# Check headers
headers = rsps.calls[0].request.headers
assert headers["Authorization"] == "Bearer 123"
assert headers["Content-Type"] == "application/json"
# Status
webhook.refresh_from_db()
assert webhook.status == "success"

View File

View File

@@ -0,0 +1,70 @@
"""A minimalist SCIM client to synchronize with remote service providers."""
import logging
import requests
from urllib3.util import Retry
logger = logging.getLogger(__name__)
adapter = requests.adapters.HTTPAdapter(
max_retries=Retry(
total=4,
backoff_factor=0.1,
status_forcelist=[500, 502],
allowed_methods=["PATCH"],
)
)
session = requests.Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
class SCIMClient:
"""A minimalist SCIM client for our needs."""
def add_user_to_group(self, webhook, user):
"""Add a user to a group from its ID or email."""
payload = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "add",
"path": "members",
"value": [
{"value": str(user.id), "email": user.email, "type": "User"}
],
}
],
}
return session.patch(
webhook.url,
json=payload,
headers=webhook.get_headers(),
verify=False,
timeout=3,
)
def remove_user_from_group(self, webhook, user):
"""Remove a user from a group by its ID or email."""
payload = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"],
"Operations": [
{
"op": "remove",
"path": "members",
"value": [
{"value": str(user.id), "email": user.email, "type": "User"}
],
}
],
}
return session.patch(
webhook.url,
json=payload,
headers=webhook.get_headers(),
verify=False,
timeout=3,
)

View File

@@ -0,0 +1,75 @@
"""Fire webhooks with synchronous retries"""
import logging
import requests
from core.enums import WebhookStatusChoices
from .scim import SCIMClient
logger = logging.getLogger(__name__)
class WebhookSCIMClient:
"""Wraps the SCIM client to record call results on webhooks."""
def __getattr__(self, name):
"""Handle calls from webhooks to synchronize a team access with a distant application."""
def wrapper(team, user):
"""
Wrap SCIMClient calls to handle retries, error handling and storing result in the
calling Webhook instance.
"""
for webhook in team.webhooks.all():
if not webhook.url:
continue
client = SCIMClient()
status = WebhookStatusChoices.FAILURE
try:
response = getattr(client, name)(webhook, user)
except requests.exceptions.RetryError as exc:
logger.error(
"%s synchronization failed due to max retries exceeded with url %s",
name,
webhook.url,
exc_info=exc,
)
except requests.exceptions.RequestException as exc:
logger.error(
"%s synchronization failed with %s.",
name,
webhook.url,
exc_info=exc,
)
else:
extra = {
"response": response.content,
}
# pylint: disable=no-member
if response.status_code == requests.codes.ok:
logger.info(
"%s synchronization succeeded with %s",
name,
webhook.url,
extra=extra,
)
status = WebhookStatusChoices.SUCCESS
else:
logger.error(
"%s synchronization failed with %s",
name,
webhook.url,
extra=extra,
)
webhook._meta.model.objects.filter(id=webhook.id).update(status=status) # noqa
return wrapper
scim_synchronizer = WebhookSCIMClient()

View File

@@ -35,7 +35,7 @@ def test_commands_create_demo():
def test_commands_createsuperuser():
"""
The createsuperuser management command should create an use
The createsuperuser management command should create a user
with superuser permissions.
"""