🔒️(backend) validate more strictly url used by cors-proxy endpoint

The cors-proxy endpoint allow to download images host externally without
being blocked by cors headers. The response is filter on the return
content-type to avoid disclosure and the usage of this endpoint as the
proxy used by attacker. We want to restrict the usage of this endpoint
by filtering on non legit ips used. This filter avoid exploitation of
Server Side Request Forgery (SSRF).
This commit is contained in:
Manuel Raynaud
2025-12-09 16:54:59 +01:00
parent dd2d2862be
commit f28da7c2c2
3 changed files with 321 additions and 6 deletions

View File

@@ -1,5 +1,8 @@
"""Test on the CORS proxy API for documents."""
import socket
import unittest.mock
import pytest
import responses
from requests.exceptions import RequestException
@@ -10,11 +13,17 @@ from core import factories
pytestmark = pytest.mark.django_db
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
@responses.activate
def test_api_docs_cors_proxy_valid_url():
def test_api_docs_cors_proxy_valid_url(mock_getaddrinfo):
"""Test the CORS proxy API for documents with a valid URL."""
document = factories.DocumentFactory(link_reach="public")
# Mock DNS resolution to return a public IP address
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0))
]
client = APIClient()
url_to_fetch = "https://external-url.com/assets/logo-gouv.png"
responses.get(url_to_fetch, body=b"", status=200, content_type="image/png")
@@ -56,11 +65,17 @@ def test_api_docs_cors_proxy_without_url_query_string():
assert response.json() == {"detail": "Missing 'url' query parameter"}
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
@responses.activate
def test_api_docs_cors_proxy_anonymous_document_not_public():
def test_api_docs_cors_proxy_anonymous_document_not_public(mock_getaddrinfo):
"""Test the CORS proxy API for documents with an anonymous user and a non-public document."""
document = factories.DocumentFactory(link_reach="authenticated")
# Mock DNS resolution to return a public IP address
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0))
]
client = APIClient()
url_to_fetch = "https://external-url.com/assets/logo-gouv.png"
responses.get(url_to_fetch, body=b"", status=200, content_type="image/png")
@@ -73,14 +88,22 @@ def test_api_docs_cors_proxy_anonymous_document_not_public():
}
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
@responses.activate
def test_api_docs_cors_proxy_authenticated_user_accessing_protected_doc():
def test_api_docs_cors_proxy_authenticated_user_accessing_protected_doc(
mock_getaddrinfo,
):
"""
Test the CORS proxy API for documents with an authenticated user accessing a protected
document.
"""
document = factories.DocumentFactory(link_reach="authenticated")
# Mock DNS resolution to return a public IP address
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0))
]
user = factories.UserFactory()
client = APIClient()
@@ -115,14 +138,22 @@ def test_api_docs_cors_proxy_authenticated_user_accessing_protected_doc():
assert response.streaming_content
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
@responses.activate
def test_api_docs_cors_proxy_authenticated_not_accessing_restricted_doc():
def test_api_docs_cors_proxy_authenticated_not_accessing_restricted_doc(
mock_getaddrinfo,
):
"""
Test the CORS proxy API for documents with an authenticated user not accessing a restricted
document.
"""
document = factories.DocumentFactory(link_reach="restricted")
# Mock DNS resolution to return a public IP address
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0))
]
user = factories.UserFactory()
client = APIClient()
@@ -138,11 +169,17 @@ def test_api_docs_cors_proxy_authenticated_not_accessing_restricted_doc():
}
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
@responses.activate
def test_api_docs_cors_proxy_unsupported_media_type():
def test_api_docs_cors_proxy_unsupported_media_type(mock_getaddrinfo):
"""Test the CORS proxy API for documents with an unsupported media type."""
document = factories.DocumentFactory(link_reach="public")
# Mock DNS resolution to return a public IP address
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0))
]
client = APIClient()
url_to_fetch = "https://external-url.com/assets/index.html"
responses.get(url_to_fetch, body=b"", status=200, content_type="text/html")
@@ -173,11 +210,17 @@ def test_api_docs_cors_proxy_invalid_url(url_to_fetch):
assert response.json() == ["Enter a valid URL."]
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
@responses.activate
def test_api_docs_cors_proxy_request_failed():
def test_api_docs_cors_proxy_request_failed(mock_getaddrinfo):
"""Test the CORS proxy API for documents with a request failed."""
document = factories.DocumentFactory(link_reach="public")
# Mock DNS resolution to return a public IP address
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0))
]
client = APIClient()
url_to_fetch = "https://external-url.com/assets/index.html"
responses.get(url_to_fetch, body=RequestException("Connection refused"))
@@ -188,3 +231,163 @@ def test_api_docs_cors_proxy_request_failed():
assert response.json() == {
"error": "Failed to fetch resource from https://external-url.com/assets/index.html"
}
@pytest.mark.parametrize(
"url_to_fetch",
[
"http://localhost/image.png",
"https://localhost/image.png",
"http://127.0.0.1/image.png",
"https://127.0.0.1/image.png",
"http://0.0.0.0/image.png",
"https://0.0.0.0/image.png",
"http://[::1]/image.png",
"https://[::1]/image.png",
"http://[0:0:0:0:0:0:0:1]/image.png",
"https://[0:0:0:0:0:0:0:1]/image.png",
],
)
def test_api_docs_cors_proxy_blocks_localhost(url_to_fetch):
"""Test that the CORS proxy API blocks localhost variations."""
document = factories.DocumentFactory(link_reach="public")
client = APIClient()
response = client.get(
f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}"
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid URL used."
@pytest.mark.parametrize(
"url_to_fetch",
[
"http://10.0.0.1/image.png",
"https://10.0.0.1/image.png",
"http://172.16.0.1/image.png",
"https://172.16.0.1/image.png",
"http://192.168.1.1/image.png",
"https://192.168.1.1/image.png",
"http://10.255.255.255/image.png",
"https://10.255.255.255/image.png",
"http://172.31.255.255/image.png",
"https://172.31.255.255/image.png",
"http://192.168.255.255/image.png",
"https://192.168.255.255/image.png",
],
)
def test_api_docs_cors_proxy_blocks_private_ips(url_to_fetch):
"""Test that the CORS proxy API blocks private IP addresses."""
document = factories.DocumentFactory(link_reach="public")
client = APIClient()
response = client.get(
f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}"
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid URL used."
@pytest.mark.parametrize(
"url_to_fetch",
[
"http://169.254.1.1/image.png",
"https://169.254.1.1/image.png",
"http://169.254.255.255/image.png",
"https://169.254.255.255/image.png",
],
)
def test_api_docs_cors_proxy_blocks_link_local(url_to_fetch):
"""Test that the CORS proxy API blocks link-local addresses."""
document = factories.DocumentFactory(link_reach="public")
client = APIClient()
response = client.get(
f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}"
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid URL used."
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
@responses.activate
def test_api_docs_cors_proxy_blocks_dns_rebinding_to_private_ip(mock_getaddrinfo):
"""Test that the CORS proxy API blocks DNS rebinding attacks to private IPs."""
document = factories.DocumentFactory(link_reach="public")
# Mock DNS resolution to return a private IP address
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.1", 0))
]
client = APIClient()
url_to_fetch = "https://malicious-domain.com/image.png"
response = client.get(
f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}"
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid URL used."
mock_getaddrinfo.assert_called_once()
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
@responses.activate
def test_api_docs_cors_proxy_blocks_dns_rebinding_to_localhost(mock_getaddrinfo):
"""Test that the CORS proxy API blocks DNS rebinding attacks to localhost."""
document = factories.DocumentFactory(link_reach="public")
# Mock DNS resolution to return localhost
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0))
]
client = APIClient()
url_to_fetch = "https://malicious-domain.com/image.png"
response = client.get(
f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}"
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid URL used."
mock_getaddrinfo.assert_called_once()
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
def test_api_docs_cors_proxy_handles_dns_resolution_failure(mock_getaddrinfo):
"""Test that the CORS proxy API handles DNS resolution failures gracefully."""
document = factories.DocumentFactory(link_reach="public")
# Mock DNS resolution to fail
mock_getaddrinfo.side_effect = socket.gaierror("Name or service not known")
client = APIClient()
url_to_fetch = "https://nonexistent-domain-12345.com/image.png"
response = client.get(
f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}"
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid URL used."
mock_getaddrinfo.assert_called_once()
@unittest.mock.patch("core.api.viewsets.socket.getaddrinfo")
def test_api_docs_cors_proxy_blocks_multiple_resolved_ips_if_any_private(
mock_getaddrinfo,
):
"""Test that the CORS proxy API blocks if any resolved IP is private."""
document = factories.DocumentFactory(link_reach="public")
# Mock DNS resolution to return both public and private IPs
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("8.8.8.8", 0)),
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("192.168.1.1", 0)),
]
client = APIClient()
url_to_fetch = "https://example.com/image.png"
response = client.get(
f"/api/v1.0/documents/{document.id!s}/cors-proxy/?url={url_to_fetch}"
)
assert response.status_code == 400
assert response.json()["detail"] == "Invalid URL used."
mock_getaddrinfo.assert_called_once()