apiVersion: batch/v1 kind: Job metadata: name: migrate-messages namespace: stalwart spec: template: spec: restartPolicy: Never containers: - name: migrate image: python:3.12-slim command: - /bin/sh - -c - | pip install psycopg2-binary && python3 /scripts/migrate.py env: - name: ADMIN_PASSWORD valueFrom: secretKeyRef: name: stalwart-app-secrets key: admin-password - name: DB_PASSWORD valueFrom: secretKeyRef: name: messages-db-credentials key: password volumeMounts: - name: script mountPath: /scripts volumes: - name: script configMap: name: migrate-messages-script --- apiVersion: v1 kind: ConfigMap metadata: name: migrate-messages-script namespace: stalwart data: migrate.py: | import json, os, zlib, urllib.request, urllib.error, base64, sys import psycopg2 DB_HOST = "postgres-rw.data.svc.cluster.local" DB_PORT = 5432 DB_NAME = "messages_db" DB_USER = "messages" DB_PASS = os.environ.get("DB_PASSWORD", "") JMAP_URL = "http://stalwart.stalwart.svc.cluster.local:8080" ADMIN_USER = "admin" ADMIN_PASS = os.environ["ADMIN_PASSWORD"] auth_header = "Basic " + base64.b64encode(f"{ADMIN_USER}:{ADMIN_PASS}".encode()).decode() def jmap_call(method_calls): body = json.dumps({ "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], "methodCalls": method_calls, }).encode() req = urllib.request.Request(f"{JMAP_URL}/jmap", data=body, headers={"Authorization": auth_header, "Content-Type": "application/json"}) with urllib.request.urlopen(req) as resp: return json.loads(resp.read()) def upload_blob(account_id, eml_bytes): req = urllib.request.Request( f"{JMAP_URL}/jmap/upload/{account_id}/", data=eml_bytes, headers={"Authorization": auth_header, "Content-Type": "message/rfc822"}) with urllib.request.urlopen(req) as resp: return json.loads(resp.read())["blobId"] def ensure_user(email, full_name): body = json.dumps({ "type": "individual", "name": email, "description": full_name or email, "emails": [email], "quota": 0, "secrets": [], "urls": [], "memberOf": [], "roles": ["user"], "lists": [], "members": [], "enabledPermissions": [], "disabledPermissions": [], "externalMembers": [], }).encode() req = urllib.request.Request(f"{JMAP_URL}/api/principal", data=body, method="POST", headers={"Authorization": auth_header, "Content-Type": "application/json"}) try: with urllib.request.urlopen(req) as resp: result = json.loads(resp.read()) print(f" Created user {email} (id={result.get('data')})") except urllib.error.HTTPError as e: body_text = e.read().decode() if "fieldAlreadyExists" in body_text: print(f" User {email} already exists") else: print(f" Error creating user {email}: {e.code} {body_text}") def get_account_id_for_user(email): """Get JMAP account ID by authenticating as the user (admin impersonation).""" # Stalwart allows admin to access any account via the master user mechanism: # authenticate as "user%admin" with admin password impersonate_auth = "Basic " + base64.b64encode( f"{email}%{ADMIN_USER}:{ADMIN_PASS}".encode()).decode() req = urllib.request.Request(f"{JMAP_URL}/.well-known/jmap", headers={"Authorization": impersonate_auth}) try: with urllib.request.urlopen(req) as resp: session = json.loads(resp.read()) return next(iter(session.get("accounts", {})), None), impersonate_auth except urllib.error.HTTPError: # Fallback: try direct admin auth req = urllib.request.Request(f"{JMAP_URL}/.well-known/jmap", headers={"Authorization": auth_header}) with urllib.request.urlopen(req) as resp: session = json.loads(resp.read()) return next(iter(session.get("accounts", {})), None), auth_header def get_inbox_id(account_id, user_auth): body = json.dumps({ "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], "methodCalls": [["Mailbox/get", {"accountId": account_id}, "0"]], }).encode() req = urllib.request.Request(f"{JMAP_URL}/jmap", data=body, headers={"Authorization": user_auth, "Content-Type": "application/json"}) with urllib.request.urlopen(req) as resp: result = json.loads(resp.read()) mailboxes = result["methodResponses"][0][1]["list"] for mb in mailboxes: if mb.get("role") == "inbox" or mb.get("name", "").lower() == "inbox": return mb["id"] return mailboxes[0]["id"] if mailboxes else None def upload_blob_as(account_id, eml_bytes, user_auth): req = urllib.request.Request( f"{JMAP_URL}/jmap/upload/{account_id}/", data=eml_bytes, headers={"Authorization": user_auth, "Content-Type": "message/rfc822"}) with urllib.request.urlopen(req) as resp: return json.loads(resp.read())["blobId"] def jmap_call_as(method_calls, user_auth): body = json.dumps({ "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], "methodCalls": method_calls, }).encode() req = urllib.request.Request(f"{JMAP_URL}/jmap", data=body, headers={"Authorization": user_auth, "Content-Type": "application/json"}) with urllib.request.urlopen(req) as resp: return json.loads(resp.read()) # Connect to messages_db print("Connecting to messages_db...") conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS) cur = conn.cursor() cur.execute(""" SELECT DISTINCT m.id, m.subject, m.sent_at, m.is_draft, m.is_starred, m.is_trashed, m.is_spam, mb.local_part || '@' || d.name as mailbox_email, b.raw_content, b.compression FROM messages_message m JOIN messages_blob b ON m.blob_id = b.id JOIN messages_thread t ON m.thread_id = t.id JOIN messages_threadaccess ta ON ta.thread_id = t.id JOIN messages_mailbox mb ON ta.mailbox_id = mb.id JOIN messages_maildomain d ON mb.domain_id = d.id ORDER BY mailbox_email, m.sent_at """) messages = cur.fetchall() print(f"Found {len(messages)} messages to migrate.") cur.execute("SELECT email, full_name FROM messages_user") users = {row[0]: row[1] for row in cur.fetchall()} conn.close() # Ensure all mailbox users exist in Stalwart seen_emails = set() for msg in messages: email = msg[7] if email not in seen_emails: seen_emails.add(email) ensure_user(email, users.get(email, email)) # Resolve per-user JMAP accounts user_contexts = {} for email in seen_emails: print(f"Resolving JMAP account for {email}...") account_id, user_auth = get_account_id_for_user(email) if account_id: inbox_id = get_inbox_id(account_id, user_auth) user_contexts[email] = (account_id, inbox_id, user_auth) print(f" {email}: account={account_id}, inbox={inbox_id}") else: print(f" WARNING: Could not get account for {email}, will skip") # Import messages into each user's account imported = 0 errors = 0 for msg in messages: msg_id, subject, sent_at, is_draft, is_starred, is_trashed, is_spam, email, raw_content, compression = msg if email not in user_contexts: errors += 1 print(f" SKIP: {email}: {subject} (no account)") continue account_id, inbox_id, user_auth = user_contexts[email] try: raw = bytes(raw_content) if compression == 1: import gzip try: eml_bytes = gzip.decompress(raw) except Exception: try: eml_bytes = zlib.decompress(raw, -zlib.MAX_WBITS) except Exception: try: eml_bytes = zlib.decompress(raw) except Exception: eml_bytes = raw else: eml_bytes = raw blob_id = upload_blob_as(account_id, eml_bytes, user_auth) keywords = {"$seen": True} if is_starred: keywords["$flagged"] = True if is_draft: keywords["$draft"] = True received_at = sent_at.isoformat() if sent_at else None import_data = { "accountId": account_id, "emails": { "imp1": { "blobId": blob_id, "mailboxIds": {inbox_id: True}, "keywords": keywords, } } } if received_at: import_data["emails"]["imp1"]["receivedAt"] = received_at result = jmap_call_as([["Email/import", import_data, "0"]], user_auth) resp = result["methodResponses"][0][1] if "created" in resp and "imp1" in resp["created"]: imported += 1 print(f" [{imported}] {email}: {subject or '(no subject)'}") else: errors += 1 print(f" ERROR: {email}: {subject}: {resp.get('notCreated', {})}") except Exception as e: errors += 1 print(f" ERROR: {email}: {subject}: {e}") print(f"\nMigration complete: {imported} imported, {errors} errors, {len(messages)} total")