#!/usr/bin/env python3 """ Migrate emails from La Suite Messages (PostgreSQL + SeaweedFS) to Stalwart (JMAP). Reads message metadata from messages_db, downloads RFC 5322 bodies from SeaweedFS, and uploads each message to Stalwart via JMAP Email/import (RFC 8621 §5.4). Usage: # Port-forward to the required services first: kubectl port-forward -n data svc/postgres-rw 5432:5432 & kubectl port-forward -n stalwart svc/stalwart 8080:8080 & # Run the migration: python3 migrate-messages.py \ --db-host 127.0.0.1 --db-port 5432 --db-name messages_db --db-user messages --db-password \ --s3-endpoint http://127.0.0.1:8333 --s3-bucket sunbeam-messages \ --s3-access-key --s3-secret-key \ --jmap-url http://127.0.0.1:8080 --jmap-user admin --jmap-password Prerequisites: pip install psycopg2-binary boto3 requests The script is idempotent: it tracks progress in a checkpoint file and skips already-imported messages on re-run. """ import argparse import json import hashlib import os import sys from pathlib import Path try: import psycopg2 import boto3 import requests except ImportError: print("Missing dependencies. Install with: pip install psycopg2-binary boto3 requests") sys.exit(1) CHECKPOINT_FILE = Path("migrate-messages-checkpoint.json") def load_checkpoint(): if CHECKPOINT_FILE.exists(): return json.loads(CHECKPOINT_FILE.read_text()) return {"imported": {}} def save_checkpoint(checkpoint): CHECKPOINT_FILE.write_text(json.dumps(checkpoint, indent=2)) def get_jmap_session(jmap_url, user, password): """Get JMAP session and extract accountId.""" resp = requests.get( f"{jmap_url}/.well-known/jmap", auth=(user, password), ) resp.raise_for_status() session = resp.json() # Use the primary account primary_accounts = session.get("primaryAccounts", {}) account_id = primary_accounts.get("urn:ietf:params:jmap:mail") if not account_id: # Fallback: first account accounts = session.get("accounts", {}) account_id = next(iter(accounts)) return session, account_id def jmap_get_mailboxes(jmap_url, account_id, user, password): """Fetch all mailboxes for the account.""" resp = requests.post( f"{jmap_url}/jmap", auth=(user, password), json={ "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], "methodCalls": [ ["Mailbox/get", {"accountId": account_id}, "0"] ], }, ) resp.raise_for_status() result = resp.json() mailboxes = result["methodResponses"][0][1]["list"] return {mb["name"]: mb["id"] for mb in mailboxes} def jmap_create_mailbox(jmap_url, account_id, user, password, name): """Create a mailbox and return its ID.""" resp = requests.post( f"{jmap_url}/jmap", auth=(user, password), json={ "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], "methodCalls": [ ["Mailbox/set", { "accountId": account_id, "create": {"mb1": {"name": name}}, }, "0"] ], }, ) resp.raise_for_status() result = resp.json() created = result["methodResponses"][0][1].get("created", {}) if "mb1" in created: return created["mb1"]["id"] # Already exists — fetch it mailboxes = jmap_get_mailboxes(jmap_url, account_id, user, password) return mailboxes.get(name) def jmap_import_email(jmap_url, account_id, user, password, eml_bytes, mailbox_id, keywords, received_at): """Import a single RFC 5322 message via JMAP Email/import.""" # First, upload the blob resp = requests.post( f"{jmap_url}/jmap/upload/{account_id}/", auth=(user, password), headers={"Content-Type": "message/rfc822"}, data=eml_bytes, ) resp.raise_for_status() blob = resp.json() blob_id = blob["blobId"] # Then import it import_data = { "accountId": account_id, "emails": { "imp1": { "blobId": blob_id, "mailboxIds": {mailbox_id: True}, "keywords": keywords, } }, } if received_at: import_data["emails"]["imp1"]["receivedAt"] = received_at resp = requests.post( f"{jmap_url}/jmap", auth=(user, password), json={ "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], "methodCalls": [ ["Email/import", import_data, "0"] ], }, ) resp.raise_for_status() result = resp.json() created = result["methodResponses"][0][1].get("created", {}) if "imp1" in created: return created["imp1"]["id"] not_created = result["methodResponses"][0][1].get("notCreated", {}) if "imp1" in not_created: err = not_created["imp1"] raise RuntimeError(f"JMAP import failed: {err}") return None def main(): parser = argparse.ArgumentParser(description="Migrate La Suite Messages → Stalwart JMAP") parser.add_argument("--db-host", default="127.0.0.1") parser.add_argument("--db-port", type=int, default=5432) parser.add_argument("--db-name", default="messages_db") parser.add_argument("--db-user", default="messages") parser.add_argument("--db-password", required=True) parser.add_argument("--s3-endpoint", default="http://127.0.0.1:8333") parser.add_argument("--s3-bucket", default="sunbeam-messages") parser.add_argument("--s3-access-key", required=True) parser.add_argument("--s3-secret-key", required=True) parser.add_argument("--jmap-url", default="http://127.0.0.1:8080") parser.add_argument("--jmap-user", default="admin") parser.add_argument("--jmap-password", required=True) parser.add_argument("--dry-run", action="store_true", help="Count messages without importing") args = parser.parse_args() checkpoint = load_checkpoint() # Connect to messages_db print("Connecting to messages_db...") conn = psycopg2.connect( host=args.db_host, port=args.db_port, dbname=args.db_name, user=args.db_user, password=args.db_password, ) # Connect to SeaweedFS print("Connecting to SeaweedFS...") s3 = boto3.client( "s3", endpoint_url=args.s3_endpoint, aws_access_key_id=args.s3_access_key, aws_secret_access_key=args.s3_secret_key, region_name="us-east-1", ) # Get JMAP session if not args.dry_run: print("Connecting to Stalwart JMAP...") session, account_id = get_jmap_session(args.jmap_url, args.jmap_user, args.jmap_password) print(f" Account: {account_id}") mailboxes = jmap_get_mailboxes(args.jmap_url, account_id, args.jmap_user, args.jmap_password) print(f" Mailboxes: {list(mailboxes.keys())}") # Query all messages from La Suite # NOTE: The actual table/column names depend on La Suite Messages' Django models. # You may need to adjust these queries after inspecting the actual schema. # Run `\dt` and `\d ` in psql against messages_db to find the real names. print("\nQuerying messages from La Suite database...") cur = conn.cursor() # List all tables to help identify the right ones cur.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name; """) tables = [row[0] for row in cur.fetchall()] print(f" Tables: {tables}") if args.dry_run: # Just count messages per table that looks like it contains emails for table in tables: cur.execute(f"SELECT COUNT(*) FROM {table};") count = cur.fetchone()[0] if count > 0: print(f" {table}: {count} rows") print("\nDry run complete. Inspect the tables above and adjust the migration") print("queries in this script to match the actual La Suite Messages schema.") conn.close() return # TODO: Replace with actual queries once schema is inspected. # The migration logic below is a template — run with --dry-run first # to see the actual table structure, then update these queries. # # Expected flow: # 1. Query user accounts # 2. For each user, query their mailboxes/folders # 3. For each mailbox, query messages (S3 key, flags, received date) # 4. Download .eml from S3 # 5. Upload to Stalwart via JMAP Email/import # # Example (adjust table/column names): # # cur.execute("SELECT id, email FROM auth_user;") # for user_id, email in cur.fetchall(): # cur.execute("SELECT id, name FROM mailbox WHERE user_id = %s;", (user_id,)) # for mb_id, mb_name in cur.fetchall(): # mailbox_jmap_id = ensure_mailbox(mb_name) # cur.execute("SELECT s3_key, is_read, received_at FROM message WHERE mailbox_id = %s;", (mb_id,)) # for s3_key, is_read, received_at in cur.fetchall(): # if s3_key in checkpoint["imported"]: # continue # eml = s3.get_object(Bucket=args.s3_bucket, Key=s3_key)["Body"].read() # keywords = {"$seen": True} if is_read else {} # jmap_import_email(..., eml, mailbox_jmap_id, keywords, received_at) # checkpoint["imported"][s3_key] = True # save_checkpoint(checkpoint) print("\n⚠️ Schema inspection required!") print("Run with --dry-run first, then update the TODO section in this script") print("with the correct table and column names from the La Suite Messages schema.") conn.close() print("\nDone.") if __name__ == "__main__": main()