(backend) search users

We need to search users by their email.
For that we will use the trigram similarity algorithm
provided by PostgreSQL. To use it we have to
activate the pg_trgm extension in postgres db.
To query the email we will use the query param
`q`.
We have another query param `document_id`, it is
necessary to exclude the users that have already
access to the document.
This commit is contained in:
Anthony LC
2024-05-29 14:48:12 +02:00
committed by Anthony LC
parent 2b456c231f
commit b4b308bda9
4 changed files with 105 additions and 13 deletions

View File

@@ -14,6 +14,7 @@ and this project adheres to
- Update document (#68)
- Remove document (#68)
- (docker) dockerize dev frontend (#63)
- (backend) list users with email filtering (#79)
## Changed

View File

@@ -111,8 +111,7 @@ class Pagination(pagination.PageNumberPagination):
class UserViewSet(
mixins.UpdateModelMixin,
viewsets.GenericViewSet,
mixins.UpdateModelMixin, viewsets.GenericViewSet, mixins.ListModelMixin
):
"""User ViewSet"""
@@ -120,6 +119,26 @@ class UserViewSet(
queryset = models.User.objects.all()
serializer_class = serializers.UserSerializer
def get_queryset(self):
"""
Limit listed users by querying the email field with a trigram similarity
search if a query is provided.
Limit listed users by excluding users already in the document if a document_id
is provided.
"""
queryset = self.queryset
if self.action == "list":
# Exclude all users already in the given document
if document_id := self.request.GET.get("document_id", ""):
queryset = queryset.exclude(documentaccess__document_id=document_id)
# Filter users by email similarity
if query := self.request.GET.get("q", ""):
queryset = queryset.filter(email__trigram_word_similar=query)
return queryset
@decorators.action(
detail=False,
methods=["get"],

View File

@@ -0,0 +1,14 @@
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('core', '0001_initial'),
]
operations = [
migrations.RunSQL(
"CREATE EXTENSION IF NOT EXISTS pg_trgm;",
reverse_sql="DROP EXTENSION IF EXISTS pg_trgm;",
),
]

View File

@@ -15,13 +15,15 @@ def test_api_users_list_anonymous():
factories.UserFactory()
client = APIClient()
response = client.get("/api/v1.0/users/")
assert response.status_code == 404
assert "Not Found" in response.content.decode("utf-8")
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
def test_api_users_list_authenticated():
"""
Authenticated users should not be able to list users.
Authenticated users should be able to list users.
"""
user = factories.UserFactory()
@@ -32,8 +34,62 @@ def test_api_users_list_authenticated():
response = client.get(
"/api/v1.0/users/",
)
assert response.status_code == 404
assert "Not Found" in response.content.decode("utf-8")
assert response.status_code == 200
content = response.json()
assert len(content["results"]) == 3
def test_api_users_list_query_email():
"""
Authenticated users should be able to list users
and filter by email.
"""
user = factories.UserFactory()
client = APIClient()
client.force_login(user)
dave = factories.UserFactory(email="david.bowman@work.com")
nicole = factories.UserFactory(email="nicole_foole@work.com")
frank = factories.UserFactory(email="frank_poole@work.com")
factories.UserFactory(email="heywood_floyd@work.com")
response = client.get(
"/api/v1.0/users/?q=david.bowman@work.com",
)
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(dave.id)]
response = client.get("/api/v1.0/users/?q=oole")
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(nicole.id), str(frank.id)]
def test_api_users_list_query_email_exclude_doc_user():
"""
Authenticated users should be able to list users
and filter by email and exclude users who have access to a document.
"""
user = factories.UserFactory()
document = factories.DocumentFactory()
client = APIClient()
client.force_login(user)
nicole = factories.UserFactory(email="nicole_foole@work.com")
frank = factories.UserFactory(email="frank_poole@work.com")
factories.UserFactory(email="heywood_floyd@work.com")
factories.UserDocumentAccessFactory(document=document, user=frank)
response = client.get("/api/v1.0/users/?q=oole&document_id=" + str(document.id))
assert response.status_code == 200
user_ids = [user["id"] for user in response.json()["results"]]
assert user_ids == [str(nicole.id)]
def test_api_users_retrieve_me_anonymous():
@@ -126,8 +182,10 @@ def test_api_users_create_anonymous():
"password": "mypassword",
},
)
assert response.status_code == 404
assert "Not Found" in response.content.decode("utf-8")
assert response.status_code == 401
assert response.json() == {
"detail": "Authentication credentials were not provided."
}
assert models.User.objects.exists() is False
@@ -146,8 +204,8 @@ def test_api_users_create_authenticated():
},
format="json",
)
assert response.status_code == 404
assert "Not Found" in response.content.decode("utf-8")
assert response.status_code == 405
assert response.json() == {"detail": 'Method "POST" not allowed.'}
assert models.User.objects.exclude(id=user.id).exists() is False
@@ -322,7 +380,7 @@ def test_api_users_delete_list_anonymous():
client = APIClient()
response = client.delete("/api/v1.0/users/")
assert response.status_code == 404
assert response.status_code == 401
assert models.User.objects.count() == 2
@@ -338,7 +396,7 @@ def test_api_users_delete_list_authenticated():
"/api/v1.0/users/",
)
assert response.status_code == 404
assert response.status_code == 405
assert models.User.objects.count() == 3