Files
sbbb/src/backend/core/services/caldav_service.py

629 lines
22 KiB
Python
Raw Normal View History

"""Services for CalDAV integration."""
import logging
import re
from datetime import date, datetime, timedelta
from typing import Optional
from urllib.parse import unquote
from uuid import uuid4
from django.conf import settings
from django.utils import timezone
import icalendar
import requests
import caldav as caldav_lib
from caldav import DAVClient
from caldav.elements.cdav import CalendarDescription
from caldav.elements.dav import DisplayName
from caldav.elements.ical import CalendarColor
from caldav.lib.error import NotFoundError
logger = logging.getLogger(__name__)
class CalDAVHTTPClient:
"""Low-level HTTP client for CalDAV server communication.
Centralizes header building, URL construction, API key validation,
and HTTP requests. All higher-level CalDAV consumers delegate to this.
"""
BASE_URI_PATH = "/api/v1.0/caldav"
DEFAULT_TIMEOUT = 30
def __init__(self):
self.base_url = settings.CALDAV_URL.rstrip("/")
@staticmethod
def get_api_key() -> str:
"""Return the outbound API key, raising ValueError if not configured."""
key = settings.CALDAV_OUTBOUND_API_KEY
if not key:
raise ValueError("CALDAV_OUTBOUND_API_KEY is not configured")
return key
@classmethod
def build_base_headers(cls, email: str) -> dict:
"""Build authentication headers for CalDAV requests."""
return {
"X-Api-Key": cls.get_api_key(),
"X-Forwarded-User": email,
}
def build_url(self, path: str, query: str = "") -> str:
"""Build a full CalDAV URL from a resource path.
Handles paths with or without the /api/v1.0/caldav prefix.
"""
# If the path already includes the base URI prefix, use it directly
if path.startswith(self.BASE_URI_PATH):
url = f"{self.base_url}{path}"
else:
clean_path = path.lstrip("/")
url = f"{self.base_url}{self.BASE_URI_PATH}/{clean_path}"
if query:
url = f"{url}?{query}"
return url
def request( # noqa: PLR0913
self,
method: str,
email: str,
path: str,
*,
query: str = "",
data=None,
extra_headers: dict | None = None,
timeout: int | None = None,
content_type: str | None = None,
) -> requests.Response:
"""Make an authenticated HTTP request to the CalDAV server."""
headers = self.build_base_headers(email)
if content_type:
headers["Content-Type"] = content_type
if extra_headers:
headers.update(extra_headers)
url = self.build_url(path, query)
return requests.request(
method=method,
url=url,
headers=headers,
data=data,
timeout=timeout or self.DEFAULT_TIMEOUT,
)
def get_dav_client(self, email: str) -> DAVClient:
"""Return a configured caldav.DAVClient for the given user email."""
headers = self.build_base_headers(email)
caldav_url = f"{self.base_url}{self.BASE_URI_PATH}/"
return DAVClient(
url=caldav_url,
username=None,
password=None,
timeout=self.DEFAULT_TIMEOUT,
headers=headers,
)
def find_event_by_uid(self, email: str, uid: str) -> tuple[str | None, str | None]:
"""Find an event by UID across all of the user's calendars.
Returns (ical_data, href) or (None, None).
"""
client = self.get_dav_client(email)
try:
principal = client.principal()
for cal in principal.calendars():
try:
event = cal.object_by_uid(uid)
return event.data, str(event.url.path)
except caldav_lib.error.NotFoundError:
continue
logger.warning("Event UID %s not found in user %s calendars", uid, email)
return None, None
except Exception:
logger.exception("CalDAV error looking up event %s", uid)
return None, None
def put_event(self, email: str, href: str, ical_data: str) -> bool:
"""PUT updated iCalendar data back to CalDAV. Returns True on success."""
try:
response = self.request(
"PUT",
email,
href,
data=ical_data.encode("utf-8"),
content_type="text/calendar; charset=utf-8",
)
if response.status_code in (200, 201, 204):
return True
logger.error(
"CalDAV PUT failed: %s %s",
response.status_code,
response.text[:500],
)
return False
except requests.exceptions.RequestException:
logger.exception("CalDAV PUT error for %s", href)
return False
@staticmethod
def update_attendee_partstat(
ical_data: str, email: str, new_partstat: str
) -> str | None:
"""Update the PARTSTAT of an attendee in iCalendar data.
Returns the modified iCalendar string, or None if attendee not found.
"""
cal = icalendar.Calendar.from_ical(ical_data)
updated = False
for component in cal.walk("VEVENT"):
for _name, attendee in component.property_items("ATTENDEE"):
attendee_val = str(attendee).lower()
if email.lower() in attendee_val:
attendee.params["PARTSTAT"] = icalendar.vText(new_partstat)
updated = True
if not updated:
return None
return cal.to_ical().decode("utf-8")
class CalDAVClient:
"""
Client for communicating with CalDAV server using the caldav library.
"""
def __init__(self):
self._http = CalDAVHTTPClient()
self.base_url = self._http.base_url
def _get_client(self, user) -> DAVClient:
"""
Get a CalDAV client for the given user.
The CalDAV server requires API key authentication via Authorization header
and X-Forwarded-User header for user identification.
"""
return self._http.get_dav_client(user.email)
def get_calendar_info(self, user, calendar_path: str) -> dict | None:
"""
Get calendar information from CalDAV server.
Returns dict with name, color, description or None if not found.
"""
client = self._get_client(user)
calendar_url = f"{self.base_url}{calendar_path}"
try:
calendar = client.calendar(url=calendar_url)
# Fetch properties
props = calendar.get_properties(
[DisplayName(), CalendarColor(), CalendarDescription()]
)
name = props.get(DisplayName.tag, "Calendar")
color = props.get(CalendarColor.tag, settings.DEFAULT_CALENDAR_COLOR)
description = props.get(CalendarDescription.tag, "")
# Clean up color (CalDAV may return with alpha channel like #RRGGBBAA)
if color and len(color) == 9 and color.startswith("#"):
color = color[:7]
logger.info("Got calendar info from CalDAV: name=%s, color=%s", name, color)
return {
"name": name,
"color": color,
"description": description,
}
except NotFoundError:
logger.warning("Calendar not found at path: %s", calendar_path)
return None
except Exception as e: # noqa: BLE001 # pylint: disable=broad-exception-caught
logger.error("Failed to get calendar info from CalDAV: %s", str(e))
return None
def create_calendar(
self, user, calendar_name: str, calendar_id: str, color: str = ""
) -> str:
"""
Create a new calendar in CalDAV server for the given user.
Returns the CalDAV server path for the calendar.
"""
client = self._get_client(user)
principal = client.principal()
try:
# Create calendar using caldav library
calendar = principal.make_calendar(name=calendar_name)
# Set calendar color if provided
if color:
calendar.set_properties([CalendarColor(color)])
# CalDAV server calendar path format: /calendars/{username}/{calendar_id}/
# The caldav library returns a URL object, convert to string and extract path
calendar_url = str(calendar.url)
# Extract path from full URL
if calendar_url.startswith(self.base_url):
path = calendar_url[len(self.base_url) :]
else:
# Fallback: construct path manually based on standard CalDAV structure
# CalDAV servers typically create calendars under /calendars/{principal}/
path = f"/calendars/{user.email}/{calendar_id}/"
logger.info(
"Created calendar in CalDAV server: %s at %s", calendar_name, path
)
return path
except Exception as e:
logger.error("Failed to create calendar in CalDAV server: %s", str(e))
raise
def get_events(
self,
user,
calendar_path: str,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
) -> list:
"""
Get events from a calendar within a time range.
Returns list of event dictionaries with parsed data.
"""
# Default to current month if no range specified
if start is None:
start = timezone.now().replace(day=1, hour=0, minute=0, second=0)
if end is None:
end = start + timedelta(days=31)
client = self._get_client(user)
# Get calendar by URL
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
try:
# Search for events in the date range
# Convert datetime to date for search if needed
start_date = start.date() if isinstance(start, datetime) else start
end_date = end.date() if isinstance(end, datetime) else end
events = calendar.search(
event=True,
start=start_date,
end=end_date,
expand=True, # Expand recurring events
)
# Parse events into dictionaries
parsed_events = []
for event in events:
event_data = self._parse_event(event)
if event_data:
parsed_events.append(event_data)
return parsed_events
except NotFoundError:
logger.warning("Calendar not found at path: %s", calendar_path)
return []
except Exception as e:
logger.error("Failed to get events from CalDAV server: %s", str(e))
raise
def create_event_raw(self, user, calendar_path: str, ics_data: str) -> str:
"""
Create an event in CalDAV server from raw ICS data.
The ics_data should be a complete VCALENDAR string.
Returns the event UID.
"""
client = self._get_client(user)
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
try:
event = calendar.save_event(ics_data)
event_uid = str(event.icalendar_component.get("uid", ""))
logger.info("Created event in CalDAV server: %s", event_uid)
return event_uid
except Exception as e:
logger.error("Failed to create event in CalDAV server: %s", str(e))
raise
def create_event(self, user, calendar_path: str, event_data: dict) -> str:
"""
Create a new event in CalDAV server.
Returns the event UID.
"""
client = self._get_client(user)
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
# Extract event data
dtstart = event_data.get("start", timezone.now())
dtend = event_data.get("end", dtstart + timedelta(hours=1))
summary = event_data.get("title", "New Event")
description = event_data.get("description", "")
location = event_data.get("location", "")
# Generate UID if not provided
event_uid = event_data.get("uid", str(uuid4()))
try:
# Create event using caldav library
event = calendar.save_event(
dtstart=dtstart,
dtend=dtend,
uid=event_uid,
summary=summary,
description=description,
location=location,
)
# Extract UID from created event
# The caldav library returns an Event object
if hasattr(event, "icalendar_component"):
event_uid = str(event.icalendar_component.get("uid", event_uid))
elif hasattr(event, "vobject_instance"):
event_uid = event.vobject_instance.vevent.uid.value
logger.info("Created event in CalDAV server: %s", event_uid)
return event_uid
except Exception as e:
logger.error("Failed to create event in CalDAV server: %s", str(e))
raise
def update_event(
self, user, calendar_path: str, event_uid: str, event_data: dict
) -> None:
"""Update an existing event in CalDAV server."""
client = self._get_client(user)
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
try:
# Search for the event by UID
events = calendar.search(event=True)
target_event = None
for event in events:
event_uid_value = None
if hasattr(event, "icalendar_component"):
event_uid_value = str(event.icalendar_component.get("uid", ""))
elif hasattr(event, "vobject_instance"):
event_uid_value = event.vobject_instance.vevent.uid.value
if event_uid_value == event_uid:
target_event = event
break
if not target_event:
raise ValueError(f"Event with UID {event_uid} not found")
# Update event properties
dtstart = event_data.get("start")
dtend = event_data.get("end")
summary = event_data.get("title")
description = event_data.get("description")
location = event_data.get("location")
# Update using icalendar component
component = target_event.icalendar_component
if dtstart:
component["dtstart"] = dtstart
if dtend:
component["dtend"] = dtend
if summary:
component["summary"] = summary
if description is not None:
component["description"] = description
if location is not None:
component["location"] = location
# Save the updated event
target_event.save()
logger.info("Updated event in CalDAV server: %s", event_uid)
except Exception as e:
logger.error("Failed to update event in CalDAV server: %s", str(e))
raise
def delete_event(self, user, calendar_path: str, event_uid: str) -> None:
"""Delete an event from CalDAV server."""
client = self._get_client(user)
calendar_url = f"{self.base_url}{calendar_path}"
calendar = client.calendar(url=calendar_url)
try:
# Search for the event by UID
events = calendar.search(event=True)
target_event = None
for event in events:
event_uid_value = None
if hasattr(event, "icalendar_component"):
event_uid_value = str(event.icalendar_component.get("uid", ""))
elif hasattr(event, "vobject_instance"):
event_uid_value = event.vobject_instance.vevent.uid.value
if event_uid_value == event_uid:
target_event = event
break
if not target_event:
raise ValueError(f"Event with UID {event_uid} not found")
# Delete the event
target_event.delete()
logger.info("Deleted event from CalDAV server: %s", event_uid)
except Exception as e:
logger.error("Failed to delete event from CalDAV server: %s", str(e))
raise
def _parse_event(self, event) -> Optional[dict]:
"""
Parse a caldav Event object and return event data as dictionary.
"""
try:
component = event.icalendar_component
event_data = {
"uid": str(component.get("uid", "")),
"title": str(component.get("summary", "")),
"start": component.get("dtstart").dt
if component.get("dtstart")
else None,
"end": component.get("dtend").dt if component.get("dtend") else None,
"description": str(component.get("description", "")),
"location": str(component.get("location", "")),
}
# Convert datetime to string format for consistency
if event_data["start"]:
if isinstance(event_data["start"], datetime):
event_data["start"] = event_data["start"].strftime("%Y%m%dT%H%M%SZ")
elif isinstance(event_data["start"], date):
event_data["start"] = event_data["start"].strftime("%Y%m%d")
if event_data["end"]:
if isinstance(event_data["end"], datetime):
event_data["end"] = event_data["end"].strftime("%Y%m%dT%H%M%SZ")
elif isinstance(event_data["end"], date):
event_data["end"] = event_data["end"].strftime("%Y%m%d")
return event_data if event_data.get("uid") else None
except Exception as e: # noqa: BLE001 # pylint: disable=broad-exception-caught
logger.warning("Failed to parse event: %s", str(e))
return None
class CalendarService:
"""
High-level service for managing calendars and events.
"""
def __init__(self):
self.caldav = CalDAVClient()
def create_default_calendar(self, user) -> str:
"""Create a default calendar for a user. Returns the caldav_path."""
from core.services.translation_service import TranslationService # noqa: PLC0415
calendar_id = str(uuid4())
lang = TranslationService.resolve_language(email=user.email)
calendar_name = TranslationService.t(
"calendar.list.defaultCalendarName", lang
)
return self.caldav.create_calendar(
user, calendar_name, calendar_id, color=settings.DEFAULT_CALENDAR_COLOR
)
def create_calendar(
self, user, name: str, color: str = ""
) -> str:
"""Create a new calendar for a user. Returns the caldav_path."""
calendar_id = str(uuid4())
return self.caldav.create_calendar(
user, name, calendar_id, color=color or settings.DEFAULT_CALENDAR_COLOR
)
def get_events(self, user, caldav_path: str, start=None, end=None) -> list:
"""Get events from a calendar. Returns parsed event data."""
return self.caldav.get_events(user, caldav_path, start, end)
def create_event(self, user, caldav_path: str, event_data: dict) -> str:
"""Create a new event."""
return self.caldav.create_event(user, caldav_path, event_data)
def update_event(
self, user, caldav_path: str, event_uid: str, event_data: dict
) -> None:
"""Update an existing event."""
self.caldav.update_event(user, caldav_path, event_uid, event_data)
def delete_event(self, user, caldav_path: str, event_uid: str) -> None:
"""Delete an event."""
self.caldav.delete_event(user, caldav_path, event_uid)
# ---------------------------------------------------------------------------
# CalDAV path utilities
# ---------------------------------------------------------------------------
# Pattern: /calendars/<email-or-encoded>/<calendar-id>/
CALDAV_PATH_PATTERN = re.compile(
r"^/calendars/[^/]+/[a-zA-Z0-9-]+/$",
)
def normalize_caldav_path(caldav_path):
"""Normalize CalDAV path to consistent format.
Strips the CalDAV API prefix (e.g. /api/v1.0/caldav/) if present,
so that paths like /api/v1.0/caldav/calendars/user@ex.com/uuid/
become /calendars/user@ex.com/uuid/.
"""
if not caldav_path.startswith("/"):
caldav_path = "/" + caldav_path
# Strip CalDAV API prefix — keep from /calendars/ onwards
calendars_idx = caldav_path.find("/calendars/")
if calendars_idx > 0:
caldav_path = caldav_path[calendars_idx:]
if not caldav_path.endswith("/"):
caldav_path = caldav_path + "/"
return caldav_path
def verify_caldav_access(user, caldav_path):
"""Verify that the user has access to the CalDAV calendar.
Checks that:
1. The path matches the expected pattern (prevents path injection)
2. The user's email matches the email in the path
"""
if not CALDAV_PATH_PATTERN.match(caldav_path):
return False
parts = caldav_path.strip("/").split("/")
if len(parts) >= 2 and parts[0] == "calendars":
path_email = unquote(parts[1])
return path_email.lower() == user.email.lower()
return False
def validate_caldav_proxy_path(path):
"""Validate that a CalDAV proxy path is safe.
Prevents path traversal attacks by rejecting paths with:
- Directory traversal sequences (../)
- Null bytes
- Paths that don't start with expected prefixes
"""
if not path:
return True # Empty path is fine (root request)
# Block directory traversal
if ".." in path:
return False
# Block null bytes
if "\x00" in path:
return False
# Path must start with a known CalDAV resource prefix
allowed_prefixes = ("calendars/", "principals/", ".well-known/")
clean = path.lstrip("/")
if clean and not any(clean.startswith(prefix) for prefix in allowed_prefixes):
return False
return True