"""Services for CalDAV integration.""" import logging import re from datetime import date, datetime, timedelta from typing import Optional from urllib.parse import unquote from uuid import uuid4 from django.conf import settings from django.utils import timezone import icalendar import requests import caldav as caldav_lib from caldav import DAVClient from caldav.elements.cdav import CalendarDescription from caldav.elements.dav import DisplayName from caldav.elements.ical import CalendarColor from caldav.lib.error import NotFoundError logger = logging.getLogger(__name__) class CalDAVHTTPClient: """Low-level HTTP client for CalDAV server communication. Centralizes header building, URL construction, API key validation, and HTTP requests. All higher-level CalDAV consumers delegate to this. """ BASE_URI_PATH = "/api/v1.0/caldav" DEFAULT_TIMEOUT = 30 def __init__(self): self.base_url = settings.CALDAV_URL.rstrip("/") @staticmethod def get_api_key() -> str: """Return the outbound API key, raising ValueError if not configured.""" key = settings.CALDAV_OUTBOUND_API_KEY if not key: raise ValueError("CALDAV_OUTBOUND_API_KEY is not configured") return key @classmethod def build_base_headers(cls, email: str) -> dict: """Build authentication headers for CalDAV requests.""" return { "X-Api-Key": cls.get_api_key(), "X-Forwarded-User": email, } def build_url(self, path: str, query: str = "") -> str: """Build a full CalDAV URL from a resource path. Handles paths with or without the /api/v1.0/caldav prefix. """ # If the path already includes the base URI prefix, use it directly if path.startswith(self.BASE_URI_PATH): url = f"{self.base_url}{path}" else: clean_path = path.lstrip("/") url = f"{self.base_url}{self.BASE_URI_PATH}/{clean_path}" if query: url = f"{url}?{query}" return url def request( # noqa: PLR0913 # pylint: disable=too-many-arguments self, method: str, email: str, path: str, *, query: str = "", data=None, extra_headers: dict | None = None, timeout: int | None = None, content_type: str | None = None, ) -> requests.Response: """Make an authenticated HTTP request to the CalDAV server.""" headers = self.build_base_headers(email) if content_type: headers["Content-Type"] = content_type if extra_headers: headers.update(extra_headers) url = self.build_url(path, query) return requests.request( method=method, url=url, headers=headers, data=data, timeout=timeout or self.DEFAULT_TIMEOUT, ) def get_dav_client(self, email: str) -> DAVClient: """Return a configured caldav.DAVClient for the given user email.""" headers = self.build_base_headers(email) caldav_url = f"{self.base_url}{self.BASE_URI_PATH}/" return DAVClient( url=caldav_url, username=None, password=None, timeout=self.DEFAULT_TIMEOUT, headers=headers, ) def find_event_by_uid(self, email: str, uid: str) -> tuple[str | None, str | None]: """Find an event by UID across all of the user's calendars. Returns (ical_data, href) or (None, None). """ client = self.get_dav_client(email) try: principal = client.principal() for cal in principal.calendars(): try: event = cal.object_by_uid(uid) return event.data, str(event.url.path) except caldav_lib.error.NotFoundError: continue logger.warning("Event UID %s not found in user %s calendars", uid, email) return None, None except Exception: # pylint: disable=broad-exception-caught logger.exception("CalDAV error looking up event %s", uid) return None, None def put_event(self, email: str, href: str, ical_data: str) -> bool: """PUT updated iCalendar data back to CalDAV. Returns True on success.""" try: response = self.request( "PUT", email, href, data=ical_data.encode("utf-8"), content_type="text/calendar; charset=utf-8", ) if response.status_code in (200, 201, 204): return True logger.error( "CalDAV PUT failed: %s %s", response.status_code, response.text[:500], ) return False except requests.exceptions.RequestException: logger.exception("CalDAV PUT error for %s", href) return False @staticmethod def update_attendee_partstat( ical_data: str, email: str, new_partstat: str ) -> str | None: """Update the PARTSTAT of an attendee in iCalendar data. Returns the modified iCalendar string, or None if attendee not found. """ cal = icalendar.Calendar.from_ical(ical_data) updated = False for component in cal.walk("VEVENT"): for _name, attendee in component.property_items("ATTENDEE"): attendee_val = str(attendee).lower() if email.lower() in attendee_val: attendee.params["PARTSTAT"] = icalendar.vText(new_partstat) updated = True if not updated: return None return cal.to_ical().decode("utf-8") class CalDAVClient: """ Client for communicating with CalDAV server using the caldav library. """ def __init__(self): self._http = CalDAVHTTPClient() self.base_url = self._http.base_url def _get_client(self, user) -> DAVClient: """ Get a CalDAV client for the given user. The CalDAV server requires API key authentication via Authorization header and X-Forwarded-User header for user identification. """ return self._http.get_dav_client(user.email) def get_calendar_info(self, user, calendar_path: str) -> dict | None: """ Get calendar information from CalDAV server. Returns dict with name, color, description or None if not found. """ client = self._get_client(user) calendar_url = f"{self.base_url}{calendar_path}" try: calendar = client.calendar(url=calendar_url) # Fetch properties props = calendar.get_properties( [DisplayName(), CalendarColor(), CalendarDescription()] ) name = props.get(DisplayName.tag, "Calendar") color = props.get(CalendarColor.tag, settings.DEFAULT_CALENDAR_COLOR) description = props.get(CalendarDescription.tag, "") # Clean up color (CalDAV may return with alpha channel like #RRGGBBAA) if color and len(color) == 9 and color.startswith("#"): color = color[:7] logger.info("Got calendar info from CalDAV: name=%s, color=%s", name, color) return { "name": name, "color": color, "description": description, } except NotFoundError: logger.warning("Calendar not found at path: %s", calendar_path) return None except Exception as e: # noqa: BLE001 # pylint: disable=broad-exception-caught logger.error("Failed to get calendar info from CalDAV: %s", str(e)) return None def create_calendar( self, user, calendar_name: str, calendar_id: str, color: str = "" ) -> str: """ Create a new calendar in CalDAV server for the given user. Returns the CalDAV server path for the calendar. """ client = self._get_client(user) principal = client.principal() try: # Create calendar using caldav library calendar = principal.make_calendar(name=calendar_name) # Set calendar color if provided if color: calendar.set_properties([CalendarColor(color)]) # CalDAV server calendar path format: /calendars/{username}/{calendar_id}/ # The caldav library returns a URL object, convert to string and extract path calendar_url = str(calendar.url) # Extract path from full URL if calendar_url.startswith(self.base_url): path = calendar_url[len(self.base_url) :] else: # Fallback: construct path manually based on standard CalDAV structure # CalDAV servers typically create calendars under /calendars/{principal}/ path = f"/calendars/{user.email}/{calendar_id}/" logger.info( "Created calendar in CalDAV server: %s at %s", calendar_name, path ) return path except Exception as e: logger.error("Failed to create calendar in CalDAV server: %s", str(e)) raise def get_events( self, user, calendar_path: str, start: Optional[datetime] = None, end: Optional[datetime] = None, ) -> list: """ Get events from a calendar within a time range. Returns list of event dictionaries with parsed data. """ # Default to current month if no range specified if start is None: start = timezone.now().replace(day=1, hour=0, minute=0, second=0) if end is None: end = start + timedelta(days=31) client = self._get_client(user) # Get calendar by URL calendar_url = f"{self.base_url}{calendar_path}" calendar = client.calendar(url=calendar_url) try: # Search for events in the date range # Convert datetime to date for search if needed start_date = start.date() if isinstance(start, datetime) else start end_date = end.date() if isinstance(end, datetime) else end events = calendar.search( event=True, start=start_date, end=end_date, expand=True, # Expand recurring events ) # Parse events into dictionaries parsed_events = [] for event in events: event_data = self._parse_event(event) if event_data: parsed_events.append(event_data) return parsed_events except NotFoundError: logger.warning("Calendar not found at path: %s", calendar_path) return [] except Exception as e: logger.error("Failed to get events from CalDAV server: %s", str(e)) raise def create_event_raw(self, user, calendar_path: str, ics_data: str) -> str: """ Create an event in CalDAV server from raw ICS data. The ics_data should be a complete VCALENDAR string. Returns the event UID. """ client = self._get_client(user) calendar_url = f"{self.base_url}{calendar_path}" calendar = client.calendar(url=calendar_url) try: event = calendar.save_event(ics_data) event_uid = str(event.icalendar_component.get("uid", "")) logger.info("Created event in CalDAV server: %s", event_uid) return event_uid except Exception as e: logger.error("Failed to create event in CalDAV server: %s", str(e)) raise def create_event(self, user, calendar_path: str, event_data: dict) -> str: """ Create a new event in CalDAV server. Returns the event UID. """ client = self._get_client(user) calendar_url = f"{self.base_url}{calendar_path}" calendar = client.calendar(url=calendar_url) # Extract event data dtstart = event_data.get("start", timezone.now()) dtend = event_data.get("end", dtstart + timedelta(hours=1)) summary = event_data.get("title", "New Event") description = event_data.get("description", "") location = event_data.get("location", "") # Generate UID if not provided event_uid = event_data.get("uid", str(uuid4())) try: # Create event using caldav library event = calendar.save_event( dtstart=dtstart, dtend=dtend, uid=event_uid, summary=summary, description=description, location=location, ) # Extract UID from created event # The caldav library returns an Event object if hasattr(event, "icalendar_component"): event_uid = str(event.icalendar_component.get("uid", event_uid)) elif hasattr(event, "vobject_instance"): event_uid = event.vobject_instance.vevent.uid.value logger.info("Created event in CalDAV server: %s", event_uid) return event_uid except Exception as e: logger.error("Failed to create event in CalDAV server: %s", str(e)) raise def update_event( self, user, calendar_path: str, event_uid: str, event_data: dict ) -> None: """Update an existing event in CalDAV server.""" client = self._get_client(user) calendar_url = f"{self.base_url}{calendar_path}" calendar = client.calendar(url=calendar_url) try: # Search for the event by UID events = calendar.search(event=True) target_event = None for event in events: event_uid_value = None if hasattr(event, "icalendar_component"): event_uid_value = str(event.icalendar_component.get("uid", "")) elif hasattr(event, "vobject_instance"): event_uid_value = event.vobject_instance.vevent.uid.value if event_uid_value == event_uid: target_event = event break if not target_event: raise ValueError(f"Event with UID {event_uid} not found") # Update event properties dtstart = event_data.get("start") dtend = event_data.get("end") summary = event_data.get("title") description = event_data.get("description") location = event_data.get("location") # Update using icalendar component component = target_event.icalendar_component if dtstart: component["dtstart"] = dtstart if dtend: component["dtend"] = dtend if summary: component["summary"] = summary if description is not None: component["description"] = description if location is not None: component["location"] = location # Save the updated event target_event.save() logger.info("Updated event in CalDAV server: %s", event_uid) except Exception as e: logger.error("Failed to update event in CalDAV server: %s", str(e)) raise def delete_event(self, user, calendar_path: str, event_uid: str) -> None: """Delete an event from CalDAV server.""" client = self._get_client(user) calendar_url = f"{self.base_url}{calendar_path}" calendar = client.calendar(url=calendar_url) try: # Search for the event by UID events = calendar.search(event=True) target_event = None for event in events: event_uid_value = None if hasattr(event, "icalendar_component"): event_uid_value = str(event.icalendar_component.get("uid", "")) elif hasattr(event, "vobject_instance"): event_uid_value = event.vobject_instance.vevent.uid.value if event_uid_value == event_uid: target_event = event break if not target_event: raise ValueError(f"Event with UID {event_uid} not found") # Delete the event target_event.delete() logger.info("Deleted event from CalDAV server: %s", event_uid) except Exception as e: logger.error("Failed to delete event from CalDAV server: %s", str(e)) raise def _parse_event(self, event) -> Optional[dict]: """ Parse a caldav Event object and return event data as dictionary. """ try: component = event.icalendar_component event_data = { "uid": str(component.get("uid", "")), "title": str(component.get("summary", "")), "start": component.get("dtstart").dt if component.get("dtstart") else None, "end": component.get("dtend").dt if component.get("dtend") else None, "description": str(component.get("description", "")), "location": str(component.get("location", "")), } # Convert datetime to string format for consistency if event_data["start"]: if isinstance(event_data["start"], datetime): event_data["start"] = event_data["start"].strftime("%Y%m%dT%H%M%SZ") elif isinstance(event_data["start"], date): event_data["start"] = event_data["start"].strftime("%Y%m%d") if event_data["end"]: if isinstance(event_data["end"], datetime): event_data["end"] = event_data["end"].strftime("%Y%m%dT%H%M%SZ") elif isinstance(event_data["end"], date): event_data["end"] = event_data["end"].strftime("%Y%m%d") return event_data if event_data.get("uid") else None except Exception as e: # noqa: BLE001 # pylint: disable=broad-exception-caught logger.warning("Failed to parse event: %s", str(e)) return None class CalendarService: """ High-level service for managing calendars and events. """ def __init__(self): self.caldav = CalDAVClient() def create_default_calendar(self, user) -> str: """Create a default calendar for a user. Returns the caldav_path.""" from core.services.translation_service import ( # noqa: PLC0415 # pylint: disable=import-outside-toplevel TranslationService, ) calendar_id = str(uuid4()) lang = TranslationService.resolve_language(email=user.email) calendar_name = TranslationService.t("calendar.list.defaultCalendarName", lang) return self.caldav.create_calendar( user, calendar_name, calendar_id, color=settings.DEFAULT_CALENDAR_COLOR ) def create_calendar(self, user, name: str, color: str = "") -> str: """Create a new calendar for a user. Returns the caldav_path.""" calendar_id = str(uuid4()) return self.caldav.create_calendar( user, name, calendar_id, color=color or settings.DEFAULT_CALENDAR_COLOR ) def get_events(self, user, caldav_path: str, start=None, end=None) -> list: """Get events from a calendar. Returns parsed event data.""" return self.caldav.get_events(user, caldav_path, start, end) def create_event(self, user, caldav_path: str, event_data: dict) -> str: """Create a new event.""" return self.caldav.create_event(user, caldav_path, event_data) def update_event( self, user, caldav_path: str, event_uid: str, event_data: dict ) -> None: """Update an existing event.""" self.caldav.update_event(user, caldav_path, event_uid, event_data) def delete_event(self, user, caldav_path: str, event_uid: str) -> None: """Delete an event.""" self.caldav.delete_event(user, caldav_path, event_uid) # --------------------------------------------------------------------------- # CalDAV path utilities # --------------------------------------------------------------------------- # Pattern: /calendars/// CALDAV_PATH_PATTERN = re.compile( r"^/calendars/[^/]+/[a-zA-Z0-9-]+/$", ) def normalize_caldav_path(caldav_path): """Normalize CalDAV path to consistent format. Strips the CalDAV API prefix (e.g. /api/v1.0/caldav/) if present, so that paths like /api/v1.0/caldav/calendars/user@ex.com/uuid/ become /calendars/user@ex.com/uuid/. """ if not caldav_path.startswith("/"): caldav_path = "/" + caldav_path # Strip CalDAV API prefix — keep from /calendars/ onwards calendars_idx = caldav_path.find("/calendars/") if calendars_idx > 0: caldav_path = caldav_path[calendars_idx:] if not caldav_path.endswith("/"): caldav_path = caldav_path + "/" return caldav_path def verify_caldav_access(user, caldav_path): """Verify that the user has access to the CalDAV calendar. Checks that: 1. The path matches the expected pattern (prevents path injection) 2. The user's email matches the email in the path """ if not CALDAV_PATH_PATTERN.match(caldav_path): return False parts = caldav_path.strip("/").split("/") if len(parts) >= 2 and parts[0] == "calendars": path_email = unquote(parts[1]) return path_email.lower() == user.email.lower() return False def validate_caldav_proxy_path(path): """Validate that a CalDAV proxy path is safe. Prevents path traversal attacks by rejecting paths with: - Directory traversal sequences (../) - Null bytes - Paths that don't start with expected prefixes """ if not path: return True # Empty path is fine (root request) # Block directory traversal if ".." in path: return False # Block null bytes if "\x00" in path: return False # Path must start with a known CalDAV resource prefix allowed_prefixes = ("calendars/", "principals/", ".well-known/") clean = path.lstrip("/") if clean and not any(clean.startswith(prefix) for prefix in allowed_prefixes): return False return True