diff --git a/CHANGELOG.md b/CHANGELOG.md index f19af445..c83596c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ and this project adheres to - ✅(backend) reduce flakiness on backend test #1769 +### Security + +- 🔒️(backend) validate more strictly url used by cors-proxy endpoint #1768 + ## [4.3.0] - 2026-01-05 ### Added diff --git a/src/backend/core/api/viewsets.py b/src/backend/core/api/viewsets.py index e0501447..dd6d4f08 100644 --- a/src/backend/core/api/viewsets.py +++ b/src/backend/core/api/viewsets.py @@ -3,8 +3,10 @@ # pylint: disable=too-many-lines import base64 +import ipaddress import json import logging +import socket import uuid from collections import defaultdict from urllib.parse import unquote, urlencode, urlparse @@ -1655,6 +1657,102 @@ class DocumentViewSet( return drf.response.Response(response, status=drf.status.HTTP_200_OK) + def _reject_invalid_ips(self, ips): + """ + Check if an IP address is safe from SSRF attacks. + + Raises: + drf.exceptions.ValidationError: If the IP is unsafe + """ + for ip in ips: + # Block loopback addresses (check before private, + # as 127.0.0.1 might be considered private) + if ip.is_loopback: + raise drf.exceptions.ValidationError( + "Access to loopback addresses is not allowed" + ) + + # Block link-local addresses (169.254.0.0/16) - check before private + if ip.is_link_local: + raise drf.exceptions.ValidationError( + "Access to link-local addresses is not allowed" + ) + + # Block private IP ranges + if ip.is_private: + raise drf.exceptions.ValidationError( + "Access to private IP addresses is not allowed" + ) + + # Block multicast addresses + if ip.is_multicast: + raise drf.exceptions.ValidationError( + "Access to multicast addresses is not allowed" + ) + + # Block reserved addresses (including 0.0.0.0) + if ip.is_reserved: + raise drf.exceptions.ValidationError( + "Access to reserved IP addresses is not allowed" + ) + + def _validate_url_against_ssrf(self, url): + """ + Validate that a URL is safe from SSRF (Server-Side Request Forgery) attacks. + + Blocks: + - localhost and its variations + - Private IP ranges (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16) + - Link-local addresses (169.254.0.0/16) + - Loopback addresses + + Raises: + drf.exceptions.ValidationError: If the URL is unsafe + """ + parsed = urlparse(url) + hostname = parsed.hostname + + if not hostname: + raise drf.exceptions.ValidationError("Invalid hostname") + + + # Resolve hostname to IP address(es) + # Check all resolved IPs to prevent DNS rebinding attacks + try: + # Try to parse as IP address first (if hostname is already an IP) + try: + ip = ipaddress.ip_address(hostname) + resolved_ips = [ip] + except ValueError: + # Resolve hostname to IP addresses (supports both IPv4 and IPv6) + resolved_ips = [] + try: + # Get all address info (IPv4 and IPv6) + addr_info = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC) + for family, _, _, _, sockaddr in addr_info: + if family == socket.AF_INET: + # IPv4 + ip = ipaddress.ip_address(sockaddr[0]) + resolved_ips.append(ip) + elif family == socket.AF_INET6: + # IPv6 + ip = ipaddress.ip_address(sockaddr[0]) + resolved_ips.append(ip) + except (socket.gaierror, OSError) as e: + raise drf.exceptions.ValidationError( + f"Failed to resolve hostname: {str(e)}" + ) from e + + if not resolved_ips: + raise drf.exceptions.ValidationError( + "No IP addresses found for hostname" + ) from None + except ValueError as e: + raise drf.exceptions.ValidationError(f"Invalid IP address: {str(e)}") from e + + # Check all resolved IPs to ensure none are private/internal + self._reject_invalid_ips(resolved_ips) + @drf.decorators.action( detail=True, methods=["get"], @@ -1688,6 +1786,16 @@ class DocumentViewSet( status=drf.status.HTTP_400_BAD_REQUEST, ) + # Validate URL against SSRF attacks + try: + self._validate_url_against_ssrf(url) + except drf.exceptions.ValidationError as e: + logger.error("Potential SSRF attack detected: %s", e) + return drf.response.Response( + {"detail": "Invalid URL used."}, + status=drf.status.HTTP_400_BAD_REQUEST, + ) + try: response = requests.get( url, diff --git a/src/backend/core/tests/documents/test_api_documents_cors_proxy.py b/src/backend/core/tests/documents/test_api_documents_cors_proxy.py index a4abb981..eb212b44 100644 --- a/src/backend/core/tests/documents/test_api_documents_cors_proxy.py +++ b/src/backend/core/tests/documents/test_api_documents_cors_proxy.py @@ -1,5 +1,8 @@ """Test on the CORS proxy API for documents.""" +import socket +import unittest.mock + import pytest import responses from requests.exceptions import RequestException @@ -10,11 +13,17 @@ from core import factories pytestmark = pytest.mark.django_db +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") @responses.activate -def test_api_docs_cors_proxy_valid_url(): +def test_api_docs_cors_proxy_valid_url(mock_getaddrinfo): """Test the CORS proxy API for documents with a valid URL.""" document = factories.DocumentFactory(link_reach="public") + # Mock DNS resolution to return a public IP address + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0)) + ] + client = APIClient() url_to_fetch = "https://external-url.com/assets/logo-gouv.png" responses.get(url_to_fetch, body=b"", status=200, content_type="image/png") @@ -56,11 +65,17 @@ def test_api_docs_cors_proxy_without_url_query_string(): assert response.json() == {"detail": "Missing 'url' query parameter"} +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") @responses.activate -def test_api_docs_cors_proxy_anonymous_document_not_public(): +def test_api_docs_cors_proxy_anonymous_document_not_public(mock_getaddrinfo): """Test the CORS proxy API for documents with an anonymous user and a non-public document.""" document = factories.DocumentFactory(link_reach="authenticated") + # Mock DNS resolution to return a public IP address + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0)) + ] + client = APIClient() url_to_fetch = "https://external-url.com/assets/logo-gouv.png" responses.get(url_to_fetch, body=b"", status=200, content_type="image/png") @@ -73,14 +88,22 @@ def test_api_docs_cors_proxy_anonymous_document_not_public(): } +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") @responses.activate -def test_api_docs_cors_proxy_authenticated_user_accessing_protected_doc(): +def test_api_docs_cors_proxy_authenticated_user_accessing_protected_doc( + mock_getaddrinfo, +): """ Test the CORS proxy API for documents with an authenticated user accessing a protected document. """ document = factories.DocumentFactory(link_reach="authenticated") + # Mock DNS resolution to return a public IP address + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0)) + ] + user = factories.UserFactory() client = APIClient() @@ -115,14 +138,22 @@ def test_api_docs_cors_proxy_authenticated_user_accessing_protected_doc(): assert response.streaming_content +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") @responses.activate -def test_api_docs_cors_proxy_authenticated_not_accessing_restricted_doc(): +def test_api_docs_cors_proxy_authenticated_not_accessing_restricted_doc( + mock_getaddrinfo, +): """ Test the CORS proxy API for documents with an authenticated user not accessing a restricted document. """ document = factories.DocumentFactory(link_reach="restricted") + # Mock DNS resolution to return a public IP address + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0)) + ] + user = factories.UserFactory() client = APIClient() @@ -138,11 +169,17 @@ def test_api_docs_cors_proxy_authenticated_not_accessing_restricted_doc(): } +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") @responses.activate -def test_api_docs_cors_proxy_unsupported_media_type(): +def test_api_docs_cors_proxy_unsupported_media_type(mock_getaddrinfo): """Test the CORS proxy API for documents with an unsupported media type.""" document = factories.DocumentFactory(link_reach="public") + # Mock DNS resolution to return a public IP address + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0)) + ] + client = APIClient() url_to_fetch = "https://external-url.com/assets/index.html" responses.get(url_to_fetch, body=b"", status=200, content_type="text/html") @@ -173,11 +210,17 @@ def test_api_docs_cors_proxy_invalid_url(url_to_fetch): assert response.json() == ["Enter a valid URL."] +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") @responses.activate -def test_api_docs_cors_proxy_request_failed(): +def test_api_docs_cors_proxy_request_failed(mock_getaddrinfo): """Test the CORS proxy API for documents with a request failed.""" document = factories.DocumentFactory(link_reach="public") + # Mock DNS resolution to return a public IP address + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0)) + ] + client = APIClient() url_to_fetch = "https://external-url.com/assets/index.html" responses.get(url_to_fetch, body=RequestException("Connection refused")) @@ -188,3 +231,163 @@ def test_api_docs_cors_proxy_request_failed(): assert response.json() == { "error": "Failed to fetch resource from https://external-url.com/assets/index.html" } + + +@pytest.mark.parametrize( + "url_to_fetch", + [ + "http://localhost/image.png", + "https://localhost/image.png", + "http://127.0.0.1/image.png", + "https://127.0.0.1/image.png", + "http://0.0.0.0/image.png", + "https://0.0.0.0/image.png", + "http://[::1]/image.png", + "https://[::1]/image.png", + "http://[0:0:0:0:0:0:0:1]/image.png", + "https://[0:0:0:0:0:0:0:1]/image.png", + ], +) +def test_api_docs_cors_proxy_blocks_localhost(url_to_fetch): + """Test that the CORS proxy API blocks localhost variations.""" + document = factories.DocumentFactory(link_reach="public") + + client = APIClient() + response = client.get( + f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}" + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid URL used." + + +@pytest.mark.parametrize( + "url_to_fetch", + [ + "http://10.0.0.1/image.png", + "https://10.0.0.1/image.png", + "http://172.16.0.1/image.png", + "https://172.16.0.1/image.png", + "http://192.168.1.1/image.png", + "https://192.168.1.1/image.png", + "http://10.255.255.255/image.png", + "https://10.255.255.255/image.png", + "http://172.31.255.255/image.png", + "https://172.31.255.255/image.png", + "http://192.168.255.255/image.png", + "https://192.168.255.255/image.png", + ], +) +def test_api_docs_cors_proxy_blocks_private_ips(url_to_fetch): + """Test that the CORS proxy API blocks private IP addresses.""" + document = factories.DocumentFactory(link_reach="public") + + client = APIClient() + response = client.get( + f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}" + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid URL used." + + +@pytest.mark.parametrize( + "url_to_fetch", + [ + "http://169.254.1.1/image.png", + "https://169.254.1.1/image.png", + "http://169.254.255.255/image.png", + "https://169.254.255.255/image.png", + ], +) +def test_api_docs_cors_proxy_blocks_link_local(url_to_fetch): + """Test that the CORS proxy API blocks link-local addresses.""" + document = factories.DocumentFactory(link_reach="public") + + client = APIClient() + response = client.get( + f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}" + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid URL used." + + +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") +@responses.activate +def test_api_docs_cors_proxy_blocks_dns_rebinding_to_private_ip(mock_getaddrinfo): + """Test that the CORS proxy API blocks DNS rebinding attacks to private IPs.""" + document = factories.DocumentFactory(link_reach="public") + + # Mock DNS resolution to return a private IP address + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.1", 0)) + ] + + client = APIClient() + url_to_fetch = "https://malicious-domain.com/image.png" + response = client.get( + f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}" + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid URL used." + mock_getaddrinfo.assert_called_once() + + +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") +@responses.activate +def test_api_docs_cors_proxy_blocks_dns_rebinding_to_localhost(mock_getaddrinfo): + """Test that the CORS proxy API blocks DNS rebinding attacks to localhost.""" + document = factories.DocumentFactory(link_reach="public") + + # Mock DNS resolution to return localhost + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0)) + ] + + client = APIClient() + url_to_fetch = "https://malicious-domain.com/image.png" + response = client.get( + f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}" + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid URL used." + mock_getaddrinfo.assert_called_once() + + +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") +def test_api_docs_cors_proxy_handles_dns_resolution_failure(mock_getaddrinfo): + """Test that the CORS proxy API handles DNS resolution failures gracefully.""" + document = factories.DocumentFactory(link_reach="public") + + # Mock DNS resolution to fail + mock_getaddrinfo.side_effect = socket.gaierror("Name or service not known") + + client = APIClient() + url_to_fetch = "https://nonexistent-domain-12345.com/image.png" + response = client.get( + f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}" + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid URL used." + mock_getaddrinfo.assert_called_once() + + +@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo") +def test_api_docs_cors_proxy_blocks_multiple_resolved_ips_if_any_private( + mock_getaddrinfo, +): + """Test that the CORS proxy API blocks if any resolved IP is private.""" + document = factories.DocumentFactory(link_reach="public") + + # Mock DNS resolution to return both public and private IPs + mock_getaddrinfo.return_value = [ + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0)), + (socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.1", 0)), + ] + + client = APIClient() + url_to_fetch = "https://example.com/image.png" + response = client.get( + f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}" + ) + assert response.status_code == 400 + assert response.json()["detail"] == "Invalid URL used." + mock_getaddrinfo.assert_called_once()