Files
dendrite2synapse/migrate.py

2044 lines
85 KiB
Python

#!/usr/bin/env python3
"""
Dendrite to Synapse PostgreSQL migration script.
Assumes:
- Both databases are PostgreSQL
- Synapse DB already initialized (run Synapse once to create schema)
- Dendrite DB is the source of truth
- Only local data is migrated (no federation state)
Usage:
python3 migrate.py \
--dendrite-db "postgresql://user:pass@localhost/dendrite" \
--synapse-db "postgresql://user:pass@localhost/synapse" \
--server-name "example.com" \
[--dendrite-media-path /path/to/dendrite/media] \
[--synapse-media-path /path/to/synapse/media] \
[--phase 1,2,3,4,5,6,7] \
[--dry-run]
"""
import argparse
import json
import logging
import os
import shutil
import sys
import hashlib
import base64
from typing import Any
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
log = logging.getLogger("dendrite2synapse")
class Migrator:
def __init__(
self,
dendrite_dsn: str,
synapse_dsn: str,
server_name: str,
dendrite_media_path: str | None = None,
synapse_media_path: str | None = None,
dry_run: bool = False,
):
self.server_name = server_name
self.dendrite_media_path = dendrite_media_path
self.synapse_media_path = synapse_media_path
self.dry_run = dry_run
self.src = psycopg2.connect(dendrite_dsn)
self.src.set_session(readonly=True)
self.dst = psycopg2.connect(synapse_dsn)
# Caches for NID resolution
self._event_type_cache: dict[int, str] = {}
self._state_key_cache: dict[int, str] = {}
self._room_nid_cache: dict[int, str] = {} # room_nid -> room_id
self._room_id_nid_cache: dict[str, int] = {} # room_id -> room_nid
self._room_version_cache: dict[int, str] = {} # room_nid -> room_version
self._user_nid_cache: dict[int, str] = {} # state_key_nid -> user_id (for membership)
def close(self):
self.src.close()
self.dst.close()
# ── NID Resolution ──────────────────────────────────────────────
def _load_event_types(self):
"""Load event_type_nid -> event_type mapping."""
with self.src.cursor() as cur:
cur.execute("SELECT event_type_nid, event_type FROM roomserver_event_types")
self._event_type_cache = dict(cur.fetchall())
log.info("Loaded %d event types", len(self._event_type_cache))
def _load_state_keys(self):
"""Load event_state_key_nid -> event_state_key mapping."""
with self.src.cursor() as cur:
cur.execute(
"SELECT event_state_key_nid, event_state_key FROM roomserver_event_state_keys"
)
self._state_key_cache = dict(cur.fetchall())
log.info("Loaded %d state keys", len(self._state_key_cache))
def _load_rooms(self):
"""Load room_nid <-> room_id and room_version mappings."""
with self.src.cursor() as cur:
cur.execute("SELECT room_nid, room_id, room_version FROM roomserver_rooms")
for nid, rid, ver in cur.fetchall():
self._room_nid_cache[nid] = rid
self._room_id_nid_cache[rid] = nid
self._room_version_cache[nid] = ver
log.info("Loaded %d rooms", len(self._room_nid_cache))
def _resolve_event_type(self, nid: int) -> str:
return self._event_type_cache.get(nid, f"unknown.type.{nid}")
def _resolve_state_key(self, nid: int) -> str:
return self._state_key_cache.get(nid, "")
def _resolve_room_nid(self, nid: int) -> str:
return self._room_nid_cache.get(nid, f"!unknown:{self.server_name}")
def _get_room_version(self, room_nid: int) -> str:
return self._room_version_cache.get(room_nid, "10")
@staticmethod
def _room_version_to_format(room_version: str) -> int:
"""Map Matrix room version to Synapse event format version.
v1-2 -> 1 (EventFormatVersions.ROOM_V1_V2)
v3 -> 2 (EventFormatVersions.ROOM_V3)
v4-10, MSC variants -> 3 (EventFormatVersions.ROOM_V4_PLUS)
v11+, Hydra -> 4 (EventFormatVersions.ROOM_V11_HYDRA_PLUS)
"""
try:
v = int(room_version)
except (ValueError, TypeError):
# MSC or unknown variants - assume v3 format
return 3
if v <= 2:
return 1
if v == 3:
return 2
if v <= 10:
return 3
return 4 # v11+
def load_nid_caches(self):
"""Load all NID lookup caches from Dendrite."""
self._load_event_types()
self._load_state_keys()
self._load_rooms()
# ── Phase 1: Users & Profiles ───────────────────────────────────
def migrate_users(self):
log.info("=== Phase 1: Users & Profiles ===")
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"SELECT localpart, server_name, created_ts, password_hash, "
"appservice_id, is_deactivated, account_type FROM userapi_accounts"
)
accounts = src_cur.fetchall()
log.info("Found %d accounts to migrate", len(accounts))
with self.dst.cursor() as dst_cur:
for acc in accounts:
user_id = f"@{acc['localpart']}:{acc['server_name']}"
# account_type: 1=user, 2=guest, 3=admin, 4=appservice
is_guest = 1 if acc["account_type"] == 2 else 0
is_admin = 1 if acc["account_type"] == 3 else 0
deactivated = 1 if acc["is_deactivated"] else 0
# Dendrite stores created_ts in milliseconds, Synapse in seconds
created_ts = acc["created_ts"] // 1000 if acc["created_ts"] else 0
dst_cur.execute(
"""
INSERT INTO users (name, password_hash, creation_ts, admin, is_guest,
appservice_id, deactivated)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (name) DO UPDATE SET
password_hash = EXCLUDED.password_hash,
creation_ts = EXCLUDED.creation_ts,
admin = EXCLUDED.admin,
is_guest = EXCLUDED.is_guest,
deactivated = EXCLUDED.deactivated
""",
(
user_id,
acc["password_hash"],
created_ts,
is_admin,
is_guest,
acc["appservice_id"],
deactivated,
),
)
log.info("Migrated %d user accounts", len(accounts))
# Profiles
src_cur2 = self.src.cursor(cursor_factory=psycopg2.extras.DictCursor)
src_cur2.execute(
"SELECT localpart, server_name, display_name, avatar_url FROM userapi_profiles"
)
profiles = src_cur2.fetchall()
src_cur2.close()
for prof in profiles:
user_id = f"@{prof['localpart']}:{prof['server_name']}"
# Synapse profiles: user_id is localpart (UNIQUE), full_user_id is @user:server (NOT NULL since v77)
dst_cur.execute(
"""
INSERT INTO profiles (user_id, full_user_id, displayname, avatar_url)
VALUES (%s, %s, %s, %s)
ON CONFLICT (user_id) DO UPDATE SET
full_user_id = EXCLUDED.full_user_id,
displayname = EXCLUDED.displayname,
avatar_url = EXCLUDED.avatar_url
""",
(prof["localpart"], user_id, prof["display_name"], prof["avatar_url"]),
)
log.info("Migrated %d profiles", len(profiles))
# Devices
src_cur3 = self.src.cursor(cursor_factory=psycopg2.extras.DictCursor)
src_cur3.execute(
"SELECT device_id, localpart, server_name, display_name, "
"created_ts, last_seen_ts, ip, user_agent, access_token FROM userapi_devices"
)
devices = src_cur3.fetchall()
src_cur3.close()
# Determine starting id for access_tokens (fresh DB = 1)
dst_cur.execute("SELECT COALESCE(MAX(id), 0) FROM access_tokens")
next_token_id = dst_cur.fetchone()[0] + 1
for dev in devices:
user_id = f"@{dev['localpart']}:{dev['server_name']}"
dst_cur.execute(
"""
INSERT INTO devices (user_id, device_id, display_name, last_seen, ip, user_agent)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (user_id, device_id) DO UPDATE SET
display_name = EXCLUDED.display_name,
last_seen = EXCLUDED.last_seen,
ip = EXCLUDED.ip,
user_agent = EXCLUDED.user_agent
""",
(
user_id,
dev["device_id"],
dev["display_name"],
dev["last_seen_ts"],
dev["ip"],
dev["user_agent"],
),
)
# Preserve existing client sessions — without this clients are forced
# to re-login, losing device_id continuity and with it the local
# Megolm key store on many clients (critical for E2EE history).
if dev["access_token"]:
dst_cur.execute(
"""
INSERT INTO access_tokens (id, user_id, device_id, token, valid_until_ms)
VALUES (%s, %s, %s, %s, NULL)
ON CONFLICT (token) DO NOTHING
""",
(next_token_id, user_id, dev["device_id"], dev["access_token"]),
)
next_token_id += 1
log.info("Migrated %d devices (+ access tokens)", len(devices))
if not self.dry_run:
self.dst.commit()
log.info("Phase 1 complete")
# ── Phase 2: Rooms ──────────────────────────────────────────────
def migrate_rooms(self):
log.info("=== Phase 2: Rooms ===")
# Get rooms
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute("SELECT room_nid, room_id, room_version FROM roomserver_rooms")
rooms = src_cur.fetchall()
log.info("Found %d rooms to migrate", len(rooms))
# Find room creators from m.room.create events
creators = {}
with self.src.cursor() as src_cur:
# event_type_nid 1 = m.room.create
src_cur.execute(
"""
SELECT r.room_id, ej.event_json
FROM roomserver_events e
JOIN roomserver_rooms r ON e.room_nid = r.room_nid
JOIN roomserver_event_json ej ON e.event_nid = ej.event_nid
WHERE e.event_type_nid = 1
"""
)
for room_id, event_json_str in src_cur.fetchall():
try:
ev = json.loads(event_json_str)
# Creator is in content.creator or sender field
creator = ev.get("content", {}).get("creator") or ev.get("sender", "")
creators[room_id] = creator
except (json.JSONDecodeError, TypeError):
pass
with self.dst.cursor() as dst_cur:
for room in rooms:
room_id = room["room_id"]
creator = creators.get(room_id, "")
dst_cur.execute(
"""
INSERT INTO rooms (room_id, creator, room_version, is_public, has_auth_chain_index)
VALUES (%s, %s, %s, false, true)
ON CONFLICT (room_id) DO UPDATE SET
creator = EXCLUDED.creator,
room_version = EXCLUDED.room_version
""",
(room_id, creator, room["room_version"]),
)
log.info("Migrated %d rooms", len(rooms))
# Room aliases
src_cur2 = self.src.cursor(cursor_factory=psycopg2.extras.DictCursor)
src_cur2.execute(
"SELECT alias, room_id, creator_id FROM roomserver_room_aliases"
)
aliases = src_cur2.fetchall()
src_cur2.close()
for alias in aliases:
dst_cur.execute(
"""
INSERT INTO room_aliases (room_alias, room_id, creator)
VALUES (%s, %s, %s)
ON CONFLICT (room_alias) DO NOTHING
""",
(alias["alias"], alias["room_id"], alias["creator_id"]),
)
# Also add the server for each alias
# Extract server from alias: #room:server.com -> server.com
# room_alias_servers has no unique constraint, check existence first
alias_server = alias["alias"].split(":")[-1] if ":" in alias["alias"] else self.server_name
dst_cur.execute(
"SELECT 1 FROM room_alias_servers WHERE room_alias = %s AND server = %s",
(alias["alias"], alias_server),
)
if not dst_cur.fetchone():
dst_cur.execute(
"INSERT INTO room_alias_servers (room_alias, server) VALUES (%s, %s)",
(alias["alias"], alias_server),
)
log.info("Migrated %d room aliases", len(aliases))
if not self.dry_run:
self.dst.commit()
log.info("Phase 2 complete")
# ── Phase 3: Events ─────────────────────────────────────────────
def migrate_events(self):
log.info("=== Phase 3: Events ===")
# Fetch all events with their JSON, ordered by event_nid (creation order)
with self.src.cursor(
name="events_cursor", cursor_factory=psycopg2.extras.DictCursor
) as src_cur:
src_cur.itersize = 5000
src_cur.execute(
"""
SELECT e.event_nid, e.event_id, e.room_nid, e.event_type_nid,
e.event_state_key_nid, e.depth, e.is_rejected,
e.auth_event_nids,
ej.event_json
FROM roomserver_events e
JOIN roomserver_event_json ej ON e.event_nid = ej.event_nid
ORDER BY e.event_nid ASC
"""
)
# Track stream ordering (global sequential)
stream_ordering = 0
batch: list[tuple] = []
batch_json: list[tuple] = []
batch_state: list[tuple] = []
batch_edges: list[tuple] = []
batch_auth: list[tuple] = []
event_count = 0
rejected_count = 0
for row in src_cur:
if row["is_rejected"]:
rejected_count += 1
continue
event_id = row["event_id"]
room_id = self._resolve_room_nid(row["room_nid"])
event_type = self._resolve_event_type(row["event_type_nid"])
state_key_nid = row["event_state_key_nid"]
depth = row["depth"]
event_json_str = row["event_json"]
# Parse event JSON for sender, origin_server_ts, content
try:
ev = json.loads(event_json_str)
except (json.JSONDecodeError, TypeError):
log.warning("Skipping event %s: invalid JSON", event_id)
continue
sender = ev.get("sender", "")
origin_server_ts = ev.get("origin_server_ts", 0)
content = json.dumps(ev.get("content", {}))
contains_url = "url" in ev.get("content", {})
# State key: 0 means not a state event in Dendrite (nid 0 doesn't exist)
# nid 1 = empty string (which IS a valid state key)
is_state_event = state_key_nid != 0
state_key = self._resolve_state_key(state_key_nid) if is_state_event else None
# Ordering
stream_ordering += 1
# topological_ordering = depth (Synapse uses depth directly)
topo = depth
# internal_metadata: stream_ordering and outlier are loaded from
# events table columns, NOT from this JSON. This JSON stores only
# supplementary flags like soft_failed, out_of_band_membership, etc.
# For migrated events, empty object is correct.
internal_metadata = "{}"
# Determine format_version from room version
# v1-2 -> 1, v3 -> 2, v4-10 -> 3, v11+ -> 4
room_version_str = ev.get("room_version", "") or ""
# Room version might be in the create event content or we look it up
format_version = self._room_version_to_format(
self._get_room_version(row["room_nid"])
)
# Rejection reason
rejection_reason = None
batch.append((
topo, # topological_ordering
event_id,
event_type, # type
room_id,
content,
"", # unrecognized_keys
True, # processed - these are already-processed events
False, # outlier
depth,
origin_server_ts,
origin_server_ts, # received_ts (use origin_server_ts as approximation)
sender,
contains_url,
"master", # instance_name
stream_ordering,
state_key,
rejection_reason,
))
batch_json.append((
event_id,
room_id,
internal_metadata,
event_json_str,
format_version,
))
# State events get an entry in state_events
if is_state_event:
batch_state.append((
event_id,
room_id,
event_type,
state_key,
"", # prev_state (no longer written per schema v67)
))
# Event auth chain entries
auth_nids = row["auth_event_nids"] or []
# We need to resolve auth_event_nids to event_ids
# We'll do this in a second pass since we need event_nid -> event_id mapping
event_count += 1
if len(batch) >= 5000:
self._flush_events(batch, batch_json, batch_state)
batch.clear()
batch_json.clear()
batch_state.clear()
log.info(" ... migrated %d events so far", event_count)
# Flush remaining
if batch:
self._flush_events(batch, batch_json, batch_state)
log.info("Migrated %d events (%d rejected skipped)", event_count, rejected_count)
# Second pass: event_edges and event_auth from the event JSON prev_events/auth_events
self._migrate_event_graph()
# Room depth
self._migrate_room_depth()
# Forward extremities
self._migrate_forward_extremities()
if not self.dry_run:
self.dst.commit()
log.info("Phase 3 complete (stream_ordering max = %d)", stream_ordering)
return stream_ordering
def _flush_events(self, batch, batch_json, batch_state):
"""Insert batches of events into Synapse tables."""
with self.dst.cursor() as dst_cur:
psycopg2.extras.execute_values(
dst_cur,
"""
INSERT INTO events (topological_ordering, event_id, type, room_id,
content, unrecognized_keys, processed, outlier, depth,
origin_server_ts, received_ts, sender, contains_url,
instance_name, stream_ordering, state_key, rejection_reason)
VALUES %s
ON CONFLICT (event_id) DO NOTHING
""",
batch,
page_size=1000,
)
psycopg2.extras.execute_values(
dst_cur,
"""
INSERT INTO event_json (event_id, room_id, internal_metadata, json, format_version)
VALUES %s
ON CONFLICT (event_id) DO NOTHING
""",
batch_json,
page_size=1000,
)
if batch_state:
psycopg2.extras.execute_values(
dst_cur,
"""
INSERT INTO state_events (event_id, room_id, type, state_key, prev_state)
VALUES %s
ON CONFLICT (event_id) DO NOTHING
""",
batch_state,
page_size=1000,
)
def _migrate_event_graph(self):
"""Build event_edges and event_auth from event JSON prev_events and auth_events."""
log.info("Building event graph (edges + auth)...")
with self.dst.cursor(name="event_graph_cur") as dst_cur:
dst_cur.itersize = 5000
dst_cur.execute("SELECT event_id, json FROM event_json ORDER BY event_id")
batch_edges = []
batch_auth = []
count = 0
for event_id, event_json_str in dst_cur:
try:
ev = json.loads(event_json_str)
except (json.JSONDecodeError, TypeError):
continue
# prev_events -> event_edges
prev_events = ev.get("prev_events", [])
for prev in prev_events:
# In room v1/v2 prev_events is [[event_id, {hash}], ...]
# In room v3+ prev_events is [event_id, ...]
if isinstance(prev, list):
prev_id = prev[0]
else:
prev_id = prev
batch_edges.append((event_id, prev_id))
# auth_events -> event_auth
auth_events = ev.get("auth_events", [])
for auth in auth_events:
if isinstance(auth, list):
auth_id = auth[0]
else:
auth_id = auth
batch_auth.append((event_id, auth_id, ev.get("room_id", "")))
count += 1
if len(batch_edges) >= 5000 or len(batch_auth) >= 5000:
self._flush_event_graph(batch_edges, batch_auth)
batch_edges.clear()
batch_auth.clear()
if batch_edges or batch_auth:
self._flush_event_graph(batch_edges, batch_auth)
log.info("Built event graph for %d events", count)
def _flush_event_graph(self, edges, auth):
with self.dst.cursor() as cur:
if edges:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO event_edges (event_id, prev_event_id)
VALUES %s
ON CONFLICT (event_id, prev_event_id) DO NOTHING
""",
edges,
page_size=1000,
)
if auth:
# event_auth has no unique constraint, deduplicate in memory
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO event_auth (event_id, auth_id, room_id)
VALUES %s
""",
auth,
page_size=1000,
)
def _migrate_room_depth(self):
"""Set room_depth for each room from migrated events."""
log.info("Setting room depth...")
with self.dst.cursor() as cur:
cur.execute(
"""
INSERT INTO room_depth (room_id, min_depth)
SELECT room_id, MIN(depth)
FROM events
WHERE NOT outlier
GROUP BY room_id
ON CONFLICT (room_id) DO UPDATE SET min_depth = EXCLUDED.min_depth
"""
)
log.info("Set room depth for %d rooms", cur.rowcount)
def _migrate_forward_extremities(self):
"""Set forward extremities from Dendrite's latest_event_nids."""
log.info("Setting forward extremities from Dendrite room state...")
with self.src.cursor() as src_cur:
# Dendrite tracks forward extremities per room as latest_event_nids
src_cur.execute(
"SELECT room_id, latest_event_nids FROM roomserver_rooms"
)
rooms = src_cur.fetchall()
# Build event_nid -> event_id mapping for extremities
all_nids = set()
for _, nids in rooms:
if nids:
all_nids.update(nids)
nid_to_event_id: dict[int, str] = {}
if all_nids:
with self.src.cursor() as src_cur:
# Fetch in batches
nid_list = list(all_nids)
src_cur.execute(
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid = ANY(%s)",
(nid_list,),
)
nid_to_event_id = dict(src_cur.fetchall())
count = 0
with self.dst.cursor() as cur:
for room_id, nids in rooms:
if not nids:
continue
for nid in nids:
event_id = nid_to_event_id.get(nid)
if event_id:
cur.execute(
"""
INSERT INTO event_forward_extremities (event_id, room_id)
VALUES (%s, %s)
ON CONFLICT (event_id, room_id) DO NOTHING
""",
(event_id, room_id),
)
count += 1
log.info("Set %d forward extremities", count)
# ── Phase 4: Room State ─────────────────────────────────────────
def migrate_room_state(self):
log.info("=== Phase 4: Room State ===")
# Build current_state_events from Dendrite's syncapi_current_room_state
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"""
SELECT room_id, event_id, type, sender, state_key, membership
FROM syncapi_current_room_state
"""
)
current_state = src_cur.fetchall()
log.info("Found %d current state events", len(current_state))
with self.dst.cursor() as dst_cur:
for cs in current_state:
dst_cur.execute(
"""
INSERT INTO current_state_events (event_id, room_id, type, state_key, membership)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (room_id, type, state_key) DO UPDATE SET
event_id = EXCLUDED.event_id,
membership = EXCLUDED.membership
""",
(cs["event_id"], cs["room_id"], cs["type"], cs["state_key"], cs["membership"]),
)
log.info("Migrated %d current state events", len(current_state))
# Build state groups - one per room for current state
# Synapse uses state groups extensively; we create minimal groups from current state
self._build_state_groups()
if not self.dry_run:
self.dst.commit()
log.info("Phase 4 complete")
def _build_state_groups(self):
"""Build incremental state groups per room.
Walks events in each room by topological/stream order. Each state event
creates a new state group (delta from previous). Non-state events share
the most recent state group. This gives Synapse correct historical state
lookups.
"""
log.info("Building state groups (incremental per room)...")
with self.dst.cursor() as dst_cur:
# Get all rooms that have events
dst_cur.execute("SELECT DISTINCT room_id FROM events ORDER BY room_id")
rooms = [r[0] for r in dst_cur.fetchall()]
state_group_id = 0
total_groups = 0
total_mappings = 0
for room_idx, room_id in enumerate(rooms):
with self.dst.cursor(name=f"sg_room_{room_idx}") as ev_cur:
ev_cur.itersize = 2000
ev_cur.execute(
"""
SELECT event_id, type, state_key
FROM events
WHERE room_id = %s
ORDER BY topological_ordering ASC, stream_ordering ASC
""",
(room_id,),
)
current_sg = None
prev_sg = None
# Track running state: (type, state_key) -> event_id
running_state: dict[tuple[str, str], str] = {}
batch_groups: list[tuple] = [] # (id, room_id, event_id)
batch_state: list[tuple] = [] # (sg, room_id, type, state_key, event_id)
batch_edges: list[tuple] = [] # (sg, prev_sg)
batch_mappings: list[tuple] = [] # (event_id, sg)
for event_id, ev_type, state_key in ev_cur:
is_state = state_key is not None
if is_state:
# State event: create new state group
state_group_id += 1
prev_sg = current_sg
current_sg = state_group_id
# Update running state
running_state[(ev_type, state_key)] = event_id
batch_groups.append((current_sg, room_id, event_id))
# Only store the delta (the changed state entry)
# Synapse resolves full state by walking state_group_edges
batch_state.append((
current_sg, room_id, ev_type, state_key, event_id
))
if prev_sg is not None:
batch_edges.append((current_sg, prev_sg))
total_groups += 1
elif current_sg is None:
# Non-state event before any state - create initial empty group
state_group_id += 1
current_sg = state_group_id
batch_groups.append((current_sg, room_id, event_id))
total_groups += 1
# Map this event to current state group
if current_sg is not None:
batch_mappings.append((event_id, current_sg))
total_mappings += 1
# Flush this room's data
if batch_groups or batch_mappings:
self._flush_state_groups(
batch_groups, batch_state, batch_edges, batch_mappings
)
if (room_idx + 1) % 100 == 0:
log.info(" ... processed %d/%d rooms", room_idx + 1, len(rooms))
log.info(
"Built %d state groups, %d event mappings across %d rooms",
total_groups, total_mappings, len(rooms),
)
def _flush_state_groups(self, groups, state, edges, mappings):
with self.dst.cursor() as cur:
if groups:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO state_groups (id, room_id, event_id)
VALUES %s ON CONFLICT (id) DO NOTHING
""",
groups, page_size=1000,
)
if state:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO state_groups_state
(state_group, room_id, type, state_key, event_id)
VALUES %s ON CONFLICT DO NOTHING
""",
state, page_size=1000,
)
if edges:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO state_group_edges (state_group, prev_state_group)
VALUES %s ON CONFLICT DO NOTHING
""",
edges, page_size=1000,
)
if mappings:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO event_to_state_groups (event_id, state_group)
VALUES %s ON CONFLICT (event_id) DO NOTHING
""",
mappings, page_size=1000,
)
# ── Phase 5: Membership ─────────────────────────────────────────
def migrate_membership(self):
log.info("=== Phase 5: Membership ===")
# Use syncapi_current_room_state for membership events (type = m.room.member)
# These have the headered event JSON with all needed fields
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"""
SELECT room_id, event_id, sender, state_key, membership,
headered_event_json
FROM syncapi_current_room_state
WHERE type = 'm.room.member'
"""
)
memberships = src_cur.fetchall()
log.info("Found %d membership entries", len(memberships))
with self.dst.cursor() as dst_cur:
for m in memberships:
event_id = m["event_id"]
room_id = m["room_id"]
user_id = m["state_key"] # state_key is the target user for membership
sender = m["sender"]
membership = m["membership"]
# Extract display_name and avatar_url from event content
display_name = None
avatar_url = None
try:
ev = json.loads(m["headered_event_json"])
content = ev.get("content", {})
display_name = content.get("displayname")
avatar_url = content.get("avatar_url")
except (json.JSONDecodeError, TypeError):
pass
# Get event_stream_ordering from events table
dst_cur.execute(
"SELECT stream_ordering FROM events WHERE event_id = %s",
(event_id,),
)
so_row = dst_cur.fetchone()
event_stream_ordering = so_row[0] if so_row else 0
dst_cur.execute(
"""
INSERT INTO room_memberships
(event_id, user_id, sender, room_id, membership,
forgotten, display_name, avatar_url, event_stream_ordering)
VALUES (%s, %s, %s, %s, %s, 0, %s, %s, %s)
ON CONFLICT (event_id) DO UPDATE SET
membership = EXCLUDED.membership,
display_name = EXCLUDED.display_name,
avatar_url = EXCLUDED.avatar_url
""",
(
event_id, user_id, sender, room_id, membership,
display_name, avatar_url, event_stream_ordering,
),
)
# local_current_membership for local users
# unique index on (user_id, room_id) via local_current_membership_idx
if user_id.endswith(f":{self.server_name}"):
dst_cur.execute(
"""
INSERT INTO local_current_membership
(room_id, user_id, event_id, membership, event_stream_ordering)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (user_id, room_id) DO UPDATE SET
event_id = EXCLUDED.event_id,
membership = EXCLUDED.membership,
event_stream_ordering = EXCLUDED.event_stream_ordering
""",
(room_id, user_id, event_id, membership, event_stream_ordering),
)
log.info("Migrated %d memberships", len(memberships))
if not self.dry_run:
self.dst.commit()
log.info("Phase 5 complete")
# ── Phase 6: Media ──────────────────────────────────────────────
def migrate_media(self):
log.info("=== Phase 6: Media ===")
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"""
SELECT media_id, media_origin, content_type, file_size_bytes,
creation_ts, upload_name, base64hash, user_id
FROM mediaapi_media_repository
WHERE media_origin = %s
""",
(self.server_name,),
)
media = src_cur.fetchall()
log.info("Found %d local media entries", len(media))
with self.dst.cursor() as dst_cur:
for m in media:
# Synapse creation_ts is in milliseconds
dst_cur.execute(
"""
INSERT INTO local_media_repository
(media_id, media_type, media_length, created_ts,
upload_name, user_id)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (media_id) DO NOTHING
""",
(
m["media_id"],
m["content_type"],
m["file_size_bytes"],
m["creation_ts"],
m["upload_name"],
m["user_id"],
),
)
log.info("Migrated %d media metadata entries", len(media))
# Thumbnails
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"""
SELECT media_id, media_origin, content_type, file_size_bytes,
creation_ts, width, height, resize_method
FROM mediaapi_thumbnail
WHERE media_origin = %s
""",
(self.server_name,),
)
thumbs = src_cur.fetchall()
with self.dst.cursor() as dst_cur:
for t in thumbs:
dst_cur.execute(
"""
INSERT INTO local_media_repository_thumbnails
(media_id, thumbnail_width, thumbnail_height,
thumbnail_type, thumbnail_method, thumbnail_length)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (media_id, thumbnail_width, thumbnail_height,
thumbnail_type, thumbnail_method) DO NOTHING
""",
(
t["media_id"],
t["width"],
t["height"],
t["content_type"],
t["resize_method"],
t["file_size_bytes"],
),
)
log.info("Migrated %d thumbnail entries", len(thumbs))
if not self.dry_run:
self.dst.commit()
# Copy actual files if paths provided
if self.dendrite_media_path and self.synapse_media_path:
self._copy_media_files(media, thumbs)
else:
log.warning("Media paths not provided, skipping file copy. "
"Use --dendrite-media-path and --synapse-media-path to copy files.")
log.info("Phase 6 complete")
@staticmethod
def _dendrite_media_path(base: str, b64hash: str) -> str | None:
"""Dendrite content path: {base}/{hash[0]}/{hash[1]}/{hash[2:]}/file"""
if not b64hash or len(b64hash) < 3:
return None
return os.path.join(base, b64hash[0], b64hash[1], b64hash[2:], "file")
@staticmethod
def _dendrite_thumb_path(base: str, b64hash: str, width: int, height: int, method: str) -> str | None:
"""Dendrite thumbnail path: {base}/{hash[0]}/{hash[1]}/{hash[2:]}/thumbnail-{w}x{h}-{method}"""
if not b64hash or len(b64hash) < 3:
return None
return os.path.join(
base, b64hash[0], b64hash[1], b64hash[2:],
f"thumbnail-{width}x{height}-{method}",
)
@staticmethod
def _synapse_content_path(base: str, media_id: str) -> str | None:
"""Synapse content path: {base}/local_content/{id[0:2]}/{id[2:4]}/{id[4:]}"""
if len(media_id) < 4:
return None
return os.path.join(base, "local_content", media_id[0:2], media_id[2:4], media_id[4:])
@staticmethod
def _synapse_thumb_path(
base: str, media_id: str, width: int, height: int,
content_type: str, method: str,
) -> str | None:
"""Synapse thumbnail path:
{base}/local_thumbnails/{id[0:2]}/{id[2:4]}/{id[4:]}/{w}-{h}-{top}-{sub}-{method}
"""
if len(media_id) < 4:
return None
# content_type e.g. "image/jpeg" -> top="image", sub="jpeg"
parts = content_type.split("/", 1)
top = parts[0] if parts else "application"
sub = parts[1] if len(parts) > 1 else "octet-stream"
return os.path.join(
base, "local_thumbnails",
media_id[0:2], media_id[2:4], media_id[4:],
f"{width}-{height}-{top}-{sub}-{method}",
)
def _copy_media_files(self, media_rows, thumb_rows):
"""Copy media files + thumbnails from Dendrite to Synapse path structure.
Dendrite content: {base}/{hash[0]}/{hash[1]}/{hash[2:]}/file
Synapse content: {base}/local_content/{id[0:2]}/{id[2:4]}/{id[4:]}
Dendrite thumbnails: {base}/{hash[0]}/{hash[1]}/{hash[2:]}/thumbnail-{w}x{h}-{method}
Synapse thumbnails: {base}/local_thumbnails/{id[0:2]}/{id[2:4]}/{id[4:]}/{w}-{h}-{top}-{sub}-{method}
"""
log.info("Copying media content files...")
copied = 0
skipped = 0
errors = 0
for m in media_rows:
src = self._dendrite_media_path(self.dendrite_media_path, m["base64hash"])
dst = self._synapse_content_path(self.synapse_media_path, m["media_id"])
if not src or not dst:
log.warning("Media %s: invalid hash or media_id, skipping", m["media_id"])
errors += 1
continue
if os.path.exists(dst):
skipped += 1
continue
if not os.path.exists(src):
log.warning("Source missing: %s (media_id=%s)", src, m["media_id"])
errors += 1
continue
if not self.dry_run:
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
copied += 1
log.info("Content files: %d copied, %d skipped (exist), %d errors", copied, skipped, errors)
# Build media_id -> base64hash lookup for thumbnails
hash_lookup = {m["media_id"]: m["base64hash"] for m in media_rows}
log.info("Copying thumbnail files...")
t_copied = 0
t_skipped = 0
t_errors = 0
for t in thumb_rows:
media_id = t["media_id"]
b64hash = hash_lookup.get(media_id)
if not b64hash:
t_errors += 1
continue
src = self._dendrite_thumb_path(
self.dendrite_media_path, b64hash,
t["width"], t["height"], t["resize_method"],
)
dst = self._synapse_thumb_path(
self.synapse_media_path, media_id,
t["width"], t["height"], t["content_type"], t["resize_method"],
)
if not src or not dst:
t_errors += 1
continue
if os.path.exists(dst):
t_skipped += 1
continue
if not os.path.exists(src):
log.debug("Thumbnail missing: %s (media_id=%s)", src, media_id)
t_errors += 1
continue
if not self.dry_run:
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
t_copied += 1
log.info("Thumbnails: %d copied, %d skipped (exist), %d errors", t_copied, t_skipped, t_errors)
# ── Phase 7: Auxiliary ──────────────────────────────────────────
def migrate_auxiliary(self):
log.info("=== Phase 7: Auxiliary Data ===")
self._migrate_receipts()
self._migrate_redactions()
self._populate_room_stats()
self._update_stream_positions()
if not self.dry_run:
self.dst.commit()
log.info("Phase 7 complete")
def _migrate_receipts(self):
"""Migrate read receipts."""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"""
SELECT id, room_id, receipt_type, user_id, event_id, receipt_ts
FROM syncapi_receipts
"""
)
receipts = src_cur.fetchall()
log.info("Found %d receipts", len(receipts))
with self.dst.cursor() as dst_cur:
for r in receipts:
# receipts_linearized - partial unique index WHERE thread_id IS NULL
dst_cur.execute(
"""
INSERT INTO receipts_linearized
(stream_id, room_id, receipt_type, user_id, event_id, data,
instance_name, thread_id)
VALUES (%s, %s, %s, %s, %s, '{}', 'master', NULL)
ON CONFLICT (room_id, receipt_type, user_id)
WHERE thread_id IS NULL
DO UPDATE SET
event_id = EXCLUDED.event_id,
stream_id = EXCLUDED.stream_id
""",
(r["id"], r["room_id"], r["receipt_type"], r["user_id"], r["event_id"]),
)
# receipts_graph - use partial unique index for NULL thread_id
event_ids_json = json.dumps([r["event_id"]])
dst_cur.execute(
"""
INSERT INTO receipts_graph
(room_id, receipt_type, user_id, event_ids, data, thread_id)
VALUES (%s, %s, %s, %s, '{}', NULL)
ON CONFLICT (room_id, receipt_type, user_id)
WHERE thread_id IS NULL
DO UPDATE SET event_ids = EXCLUDED.event_ids
""",
(r["room_id"], r["receipt_type"], r["user_id"], event_ids_json),
)
log.info("Migrated %d receipts", len(receipts))
def _migrate_redactions(self):
"""Migrate redaction tracking."""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"SELECT redaction_event_id, redacts_event_id, validated FROM roomserver_redactions"
)
redactions = src_cur.fetchall()
log.info("Found %d redactions", len(redactions))
with self.dst.cursor() as dst_cur:
for r in redactions:
# Get received_ts from the event
dst_cur.execute(
"SELECT received_ts FROM events WHERE event_id = %s",
(r["redaction_event_id"],),
)
row = dst_cur.fetchone()
received_ts = row[0] if row else 0
dst_cur.execute(
"""
INSERT INTO redactions (event_id, redacts, have_censored, received_ts)
VALUES (%s, %s, %s, %s)
ON CONFLICT (event_id) DO NOTHING
""",
(r["redaction_event_id"], r["redacts_event_id"],
r["validated"], received_ts),
)
log.info("Migrated %d redactions", len(redactions))
def _populate_room_stats(self):
"""Populate room_stats_current and room_stats_state from migrated data."""
log.info("Populating room stats...")
with self.dst.cursor() as cur:
# room_stats_current: count members by membership type
cur.execute(
"""
INSERT INTO room_stats_current
(room_id, current_state_events, joined_members, invited_members,
left_members, banned_members, local_users_in_room,
completed_delta_stream_id, knocked_members)
SELECT
r.room_id,
COALESCE(cse.cnt, 0),
COALESCE(jm.cnt, 0),
COALESCE(im.cnt, 0),
COALESCE(lm.cnt, 0),
COALESCE(bm.cnt, 0),
COALESCE(lu.cnt, 0),
0,
0
FROM rooms r
LEFT JOIN (
SELECT room_id, COUNT(*) cnt FROM current_state_events GROUP BY room_id
) cse ON cse.room_id = r.room_id
LEFT JOIN (
SELECT room_id, COUNT(*) cnt FROM room_memberships WHERE membership = 'join' GROUP BY room_id
) jm ON jm.room_id = r.room_id
LEFT JOIN (
SELECT room_id, COUNT(*) cnt FROM room_memberships WHERE membership = 'invite' GROUP BY room_id
) im ON im.room_id = r.room_id
LEFT JOIN (
SELECT room_id, COUNT(*) cnt FROM room_memberships WHERE membership = 'leave' GROUP BY room_id
) lm ON lm.room_id = r.room_id
LEFT JOIN (
SELECT room_id, COUNT(*) cnt FROM room_memberships WHERE membership = 'ban' GROUP BY room_id
) bm ON bm.room_id = r.room_id
LEFT JOIN (
SELECT lcm.room_id, COUNT(*) cnt FROM local_current_membership lcm
WHERE lcm.membership = 'join' GROUP BY lcm.room_id
) lu ON lu.room_id = r.room_id
ON CONFLICT (room_id) DO UPDATE SET
current_state_events = EXCLUDED.current_state_events,
joined_members = EXCLUDED.joined_members,
invited_members = EXCLUDED.invited_members,
left_members = EXCLUDED.left_members,
banned_members = EXCLUDED.banned_members,
local_users_in_room = EXCLUDED.local_users_in_room
"""
)
log.info("Populated room_stats_current: %d rows", cur.rowcount)
# room_stats_state: extract from current state events
cur.execute(
"""
INSERT INTO room_stats_state (room_id, name, canonical_alias, join_rules,
history_visibility, encryption, avatar, guest_access, is_federatable, topic)
SELECT
r.room_id,
MAX(CASE WHEN cse.type = 'm.room.name' THEN
(SELECT e.content::jsonb->>'name' FROM events e WHERE e.event_id = cse.event_id)
END),
MAX(CASE WHEN cse.type = 'm.room.canonical_alias' THEN
(SELECT e.content::jsonb->>'alias' FROM events e WHERE e.event_id = cse.event_id)
END),
MAX(CASE WHEN cse.type = 'm.room.join_rules' THEN
(SELECT e.content::jsonb->>'join_rule' FROM events e WHERE e.event_id = cse.event_id)
END),
MAX(CASE WHEN cse.type = 'm.room.history_visibility' THEN
(SELECT e.content::jsonb->>'history_visibility' FROM events e WHERE e.event_id = cse.event_id)
END),
MAX(CASE WHEN cse.type = 'm.room.encryption' THEN
(SELECT e.content::jsonb->>'algorithm' FROM events e WHERE e.event_id = cse.event_id)
END),
MAX(CASE WHEN cse.type = 'm.room.avatar' THEN
(SELECT e.content::jsonb->>'url' FROM events e WHERE e.event_id = cse.event_id)
END),
MAX(CASE WHEN cse.type = 'm.room.guest_access' THEN
(SELECT e.content::jsonb->>'guest_access' FROM events e WHERE e.event_id = cse.event_id)
END),
TRUE,
MAX(CASE WHEN cse.type = 'm.room.topic' THEN
(SELECT e.content::jsonb->>'topic' FROM events e WHERE e.event_id = cse.event_id)
END)
FROM rooms r
JOIN current_state_events cse ON cse.room_id = r.room_id
WHERE cse.type IN ('m.room.name', 'm.room.canonical_alias', 'm.room.join_rules',
'm.room.history_visibility', 'm.room.encryption', 'm.room.avatar',
'm.room.guest_access', 'm.room.topic')
GROUP BY r.room_id
ON CONFLICT (room_id) DO UPDATE SET
name = EXCLUDED.name,
canonical_alias = EXCLUDED.canonical_alias,
join_rules = EXCLUDED.join_rules,
history_visibility = EXCLUDED.history_visibility,
encryption = EXCLUDED.encryption,
avatar = EXCLUDED.avatar,
guest_access = EXCLUDED.guest_access,
topic = EXCLUDED.topic
"""
)
log.info("Populated room_stats_state: %d rows", cur.rowcount)
def _update_stream_positions(self):
"""Update various stream position trackers so Synapse knows where to resume."""
log.info("Updating stream positions...")
with self.dst.cursor() as cur:
# Get max stream_ordering from events
cur.execute("SELECT COALESCE(MAX(stream_ordering), 0) FROM events")
max_stream = cur.fetchone()[0]
# Update the events stream sequence
if max_stream > 0:
cur.execute(
"SELECT setval('events_stream_seq', %s, true)",
(max_stream,),
)
# Get max receipt stream_id
cur.execute("SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized")
max_receipt_stream = cur.fetchone()[0]
# user_stats_current
cur.execute(
"""
INSERT INTO user_stats_current (user_id, joined_rooms, completed_delta_stream_id)
SELECT u.name, COALESCE(jr.cnt, 0), 0
FROM users u
LEFT JOIN (
SELECT user_id, COUNT(*) cnt FROM local_current_membership
WHERE membership = 'join' GROUP BY user_id
) jr ON jr.user_id = u.name
ON CONFLICT (user_id) DO UPDATE SET
joined_rooms = EXCLUDED.joined_rooms
"""
)
log.info("Updated user stats: %d rows", cur.rowcount)
log.info("Stream positions: events=%d, receipts=%d", max_stream, max_receipt_stream)
# ── Phase 8: E2EE Keys ─────────────────────────────────────────
_CROSS_SIGNING_KEY_TYPE = {1: "master", 2: "self_signing", 3: "user_signing"}
def migrate_e2ee(self):
log.info("=== Phase 8: E2EE Keys ===")
self._migrate_key_backup()
self._migrate_device_keys()
self._migrate_one_time_keys()
self._migrate_fallback_keys()
self._migrate_cross_signing_keys()
self._migrate_cross_signing_sigs()
self._migrate_device_inbox()
self._migrate_device_lists_stream()
if not self.dry_run:
self.dst.commit()
log.info("Phase 8 complete")
def _migrate_key_backup(self):
"""Migrate server-side E2EE key backup (versions + room keys)."""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute("SELECT user_id, version, algorithm, auth_data, etag, deleted FROM userapi_key_backup_versions")
versions = src_cur.fetchall()
log.info("Found %d key backup versions", len(versions))
with self.dst.cursor() as dst_cur:
for v in versions:
# Dendrite user_id is localpart, prepend @...:server
user_id = v["user_id"] if v["user_id"].startswith("@") else f"@{v['user_id']}:{self.server_name}"
# etag is text in Dendrite, bigint in Synapse
try:
etag = int(v["etag"])
except (ValueError, TypeError):
etag = 0
dst_cur.execute(
"""
INSERT INTO e2e_room_keys_versions
(user_id, version, algorithm, auth_data, deleted, etag)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (user_id, version) DO NOTHING
""",
(user_id, v["version"], v["algorithm"], v["auth_data"], v["deleted"], etag),
)
log.info("Migrated %d key backup versions", len(versions))
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"SELECT user_id, room_id, session_id, version, first_message_index, "
"forwarded_count, is_verified, session_data FROM userapi_key_backups"
)
keys = src_cur.fetchall()
log.info("Found %d backed-up room keys", len(keys))
with self.dst.cursor() as dst_cur:
for k in keys:
user_id = k["user_id"] if k["user_id"].startswith("@") else f"@{k['user_id']}:{self.server_name}"
dst_cur.execute(
"""
INSERT INTO e2e_room_keys
(user_id, room_id, session_id, version, first_message_index,
forwarded_count, is_verified, session_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (user_id, version, room_id, session_id) DO NOTHING
""",
(user_id, k["room_id"], k["session_id"], int(k["version"]),
k["first_message_index"], k["forwarded_count"],
k["is_verified"], k["session_data"]),
)
log.info("Migrated %d backed-up room keys", len(keys))
def _migrate_device_keys(self):
"""Migrate E2E device keys."""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"SELECT user_id, device_id, ts_added_secs, key_json FROM keyserver_device_keys"
)
keys = src_cur.fetchall()
log.info("Found %d device keys", len(keys))
with self.dst.cursor() as dst_cur:
for k in keys:
dst_cur.execute(
"""
INSERT INTO e2e_device_keys_json
(user_id, device_id, ts_added_ms, key_json)
VALUES (%s, %s, %s, %s)
ON CONFLICT (user_id, device_id) DO NOTHING
""",
(k["user_id"], k["device_id"], k["ts_added_secs"] * 1000, k["key_json"]),
)
log.info("Migrated %d device keys", len(keys))
def _migrate_one_time_keys(self):
"""Migrate E2E one-time keys."""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"SELECT user_id, device_id, key_id, algorithm, ts_added_secs, key_json "
"FROM keyserver_one_time_keys"
)
keys = src_cur.fetchall()
log.info("Found %d one-time keys", len(keys))
with self.dst.cursor() as dst_cur:
for k in keys:
dst_cur.execute(
"""
INSERT INTO e2e_one_time_keys_json
(user_id, device_id, algorithm, key_id, ts_added_ms, key_json)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (user_id, device_id, algorithm, key_id) DO NOTHING
""",
(k["user_id"], k["device_id"], k["algorithm"], k["key_id"],
k["ts_added_secs"] * 1000, k["key_json"]),
)
log.info("Migrated %d one-time keys", len(keys))
def _migrate_fallback_keys(self):
"""Migrate fallback keys (used when OTKs exhausted; essential for new Olm sessions)."""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"SELECT user_id, device_id, key_id, algorithm, key_json, used "
"FROM keyserver_fallback_keys"
)
keys = src_cur.fetchall()
log.info("Found %d fallback keys", len(keys))
with self.dst.cursor() as dst_cur:
for k in keys:
dst_cur.execute(
"""
INSERT INTO e2e_fallback_keys_json
(user_id, device_id, algorithm, key_id, key_json, used)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (user_id, device_id, algorithm) DO NOTHING
""",
(k["user_id"], k["device_id"], k["algorithm"],
k["key_id"], k["key_json"], k["used"]),
)
log.info("Migrated %d fallback keys", len(keys))
def _migrate_cross_signing_keys(self):
"""Migrate cross-signing keys (key_type int -> string)."""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute("SELECT user_id, key_type, key_data FROM keyserver_cross_signing_keys")
keys = src_cur.fetchall()
log.info("Found %d cross-signing keys", len(keys))
with self.dst.cursor() as dst_cur:
# Use the e2e_cross_signing_keys_sequence so post-migration writes
# don't collide on the UNIQUE(stream_id) index.
for k in keys:
key_type_str = self._CROSS_SIGNING_KEY_TYPE.get(k["key_type"])
if not key_type_str:
log.warning("Unknown cross-signing key_type %d for user %s", k["key_type"], k["user_id"])
continue
dst_cur.execute("SELECT nextval('e2e_cross_signing_keys_sequence')")
stream_id = dst_cur.fetchone()[0]
dst_cur.execute(
"""
INSERT INTO e2e_cross_signing_keys
(user_id, keytype, keydata, stream_id)
VALUES (%s, %s, %s, %s)
ON CONFLICT (user_id, keytype, stream_id) DO NOTHING
""",
(k["user_id"], key_type_str, k["key_data"], stream_id),
)
log.info("Migrated %d cross-signing keys", len(keys))
def _migrate_cross_signing_sigs(self):
"""Migrate cross-signing signatures."""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"SELECT origin_user_id, origin_key_id, target_user_id, target_key_id, signature "
"FROM keyserver_cross_signing_sigs"
)
sigs = src_cur.fetchall()
log.info("Found %d cross-signing signatures", len(sigs))
with self.dst.cursor() as dst_cur:
for s in sigs:
dst_cur.execute(
"""
INSERT INTO e2e_cross_signing_signatures
(user_id, key_id, target_user_id, target_device_id, signature)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
""",
(s["origin_user_id"], s["origin_key_id"],
s["target_user_id"], s["target_key_id"], s["signature"]),
)
log.info("Migrated %d cross-signing signatures", len(sigs))
def _migrate_device_inbox(self):
"""Migrate pending to-device messages (undelivered m.room_key shares etc).
Dendrite holds these in syncapi_send_to_device until a client syncs.
These rows typically contain m.room.encrypted Olm messages carrying
Megolm session keys — dropping them causes permanent key loss for
recipients that were offline at migration time.
"""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
src_cur.execute(
"SELECT id, user_id, device_id, content "
"FROM syncapi_send_to_device ORDER BY id"
)
msgs = src_cur.fetchall()
log.info("Found %d pending to-device messages", len(msgs))
if not msgs:
return
with self.dst.cursor() as dst_cur:
# Use the device_inbox_sequence so Synapse's next stream positions
# continue past our migrated range.
for m in msgs:
dst_cur.execute("SELECT nextval('device_inbox_sequence')")
stream_id = dst_cur.fetchone()[0]
dst_cur.execute(
"""
INSERT INTO device_inbox
(user_id, device_id, stream_id, message_json, instance_name)
VALUES (%s, %s, %s, %s, 'master')
""",
(m["user_id"], m["device_id"], stream_id, m["content"]),
)
log.info("Migrated %d to-device messages", len(msgs))
def _migrate_device_lists_stream(self):
"""Bootstrap device_lists_stream for every local device.
Without an entry here, clients never see the device in /sync's
`device_lists.changed` field and rely on stale cached keys. One
synthetic stream entry per local device forces a re-verification
on the next sync, which fixes the "device key mismatch → refuse
to decrypt" class of breakage after migration.
"""
with self.src.cursor(cursor_factory=psycopg2.extras.DictCursor) as src_cur:
# Only local devices (matching our server_name). Remote devices
# live in device_lists_remote_cache and aren't streamed.
src_cur.execute(
"""
SELECT DISTINCT k.user_id, k.device_id
FROM keyserver_device_keys k
WHERE k.user_id LIKE %s
""",
(f"%:{self.server_name}",),
)
rows = src_cur.fetchall()
log.info("Bootstrapping %d device_lists_stream entries", len(rows))
with self.dst.cursor() as dst_cur:
for r in rows:
dst_cur.execute("SELECT nextval('device_lists_sequence')")
stream_id = dst_cur.fetchone()[0]
dst_cur.execute(
"""
INSERT INTO device_lists_stream
(stream_id, user_id, device_id, instance_name)
VALUES (%s, %s, %s, 'master')
""",
(stream_id, r["user_id"], r["device_id"]),
)
log.info("Inserted %d device_lists_stream rows", len(rows))
# ── Run ─────────────────────────────────────────────────────────
def run(self, phases: list[int]):
self.load_nid_caches()
phase_map = {
1: ("Users & Profiles", self.migrate_users),
2: ("Rooms", self.migrate_rooms),
3: ("Events", self.migrate_events),
4: ("Room State", self.migrate_room_state),
5: ("Membership", self.migrate_membership),
6: ("Media", self.migrate_media),
7: ("Auxiliary", self.migrate_auxiliary),
8: ("E2EE Keys", self.migrate_e2ee),
}
for phase_num in sorted(phases):
if phase_num in phase_map:
name, func = phase_map[phase_num]
log.info("Starting phase %d: %s", phase_num, name)
try:
func()
except Exception:
log.exception("Phase %d failed", phase_num)
if not self.dry_run:
self.dst.rollback()
raise
log.info("Migration complete!")
# Print summary
self._print_summary()
def _print_summary(self):
"""Print migration summary counts."""
with self.dst.cursor() as cur:
tables = [
"users", "profiles", "devices", "rooms", "room_aliases",
"events", "event_json", "state_events", "current_state_events",
"room_memberships", "local_current_membership",
"local_media_repository", "receipts_linearized", "redactions",
"e2e_room_keys_versions", "e2e_room_keys",
"e2e_device_keys_json", "e2e_one_time_keys_json",
"e2e_fallback_keys_json",
"e2e_cross_signing_keys", "e2e_cross_signing_signatures",
"device_inbox", "device_lists_stream", "access_tokens",
]
log.info("=== Migration Summary ===")
for table in tables:
try:
cur.execute(f"SELECT COUNT(*) FROM {table}") # noqa: S608
count = cur.fetchone()[0]
log.info(" %-35s %d rows", table, count)
except Exception:
self.dst.rollback()
PHASE_HELP = """\
Migration phases (use --phase to select):
Phase 1 - Users & Profiles
Migrates user accounts, display names, avatars, and devices.
Dendrite tables: userapi_accounts, userapi_profiles, userapi_devices
Synapse tables: users, profiles, devices
Notes: Timestamps converted from milliseconds to seconds. Account type
mapped to admin/guest flags. Passwords (bcrypt hashes) copied as-is.
Phase 2 - Rooms
Migrates room metadata and aliases.
Dendrite tables: roomserver_rooms, roomserver_room_aliases
Synapse tables: rooms, room_aliases, room_alias_servers
Notes: Room creator extracted from m.room.create events in Dendrite.
Phase 3 - Events
Core migration: converts all non-rejected events with full denormalization
of Dendrite's numeric IDs (NIDs) to text IDs.
Dendrite tables: roomserver_events, roomserver_event_json,
roomserver_event_types, roomserver_event_state_keys
Synapse tables: events, event_json, state_events, event_edges,
event_auth, event_forward_extremities, room_depth
Notes: topological_ordering set to event depth (not a counter).
Event format version derived from room version. Rejected events skipped.
Forward extremities taken from Dendrite's latest_event_nids. Event graph
(prev_events, auth_events) built from event JSON.
Phase 4 - Room State
Builds current state snapshot and incremental state groups for correct
historical state lookups.
Dendrite tables: syncapi_current_room_state
Synapse tables: current_state_events, state_groups, state_groups_state,
state_group_edges, event_to_state_groups
Notes: One state group created per state event (delta chain). Every event
mapped to the state group that was active when it was processed.
Phase 5 - Membership
Migrates room membership (joins, leaves, invites, bans).
Dendrite tables: syncapi_current_room_state (type=m.room.member)
Synapse tables: room_memberships, local_current_membership
Notes: Display name and avatar extracted from membership event content.
local_current_membership populated only for users on this server.
Phase 6 - Media
Migrates media metadata and optionally copies files between filesystem
layouts. Requires --dendrite-media-path and --synapse-media-path to copy
actual files; without them, only database metadata is migrated.
Dendrite tables: mediaapi_media_repository, mediaapi_thumbnail
Synapse tables: local_media_repository, local_media_repository_thumbnails
File layout conversion:
Content: {dendrite}/H/A/SH.../file
-> {synapse}/local_content/me/di/a_id...
Thumbnails: {dendrite}/H/A/SH.../thumbnail-WxH-method
-> {synapse}/local_thumbnails/me/di/a_id.../W-H-type-subtype-method
Phase 7 - Auxiliary Data
Migrates read receipts, redactions, and populates statistics/caches that
Synapse expects.
Dendrite tables: syncapi_receipts, roomserver_redactions
Synapse tables: receipts_linearized, receipts_graph, redactions,
room_stats_current, room_stats_state, user_stats_current
Notes: Updates the events_stream_seq sequence so Synapse continues from
the correct position. Room stats populated from migrated membership data.
Phase 8 - E2EE Keys
Migrates end-to-end encryption data: server-side key backups, device keys,
one-time keys, fallback keys, cross-signing keys and signatures, the
to-device inbox, and a bootstrap of the device_lists_stream. This phase
is critical for restoring encrypted message history and for keeping
existing Olm/Megolm sessions alive across the migration.
Dendrite tables: userapi_key_backup_versions, userapi_key_backups,
keyserver_device_keys, keyserver_one_time_keys,
keyserver_fallback_keys,
keyserver_cross_signing_keys, keyserver_cross_signing_sigs,
syncapi_send_to_device
Synapse tables: e2e_room_keys_versions, e2e_room_keys,
e2e_device_keys_json, e2e_one_time_keys_json,
e2e_fallback_keys_json,
e2e_cross_signing_keys, e2e_cross_signing_signatures,
device_inbox, device_lists_stream
Notes:
- Cross-signing key_type int (1=master, 2=self_signing, 3=user_signing)
converted to text; stream_id drawn from e2e_cross_signing_keys_sequence
so post-migration writes don't collide on UNIQUE(stream_id).
- Timestamps seconds -> milliseconds for device/OTK keys.
- syncapi_send_to_device -> device_inbox preserves undelivered
m.room.encrypted Olm messages (these carry m.room_key Megolm shares
to offline devices; dropping them causes permanent key loss).
- device_lists_stream is bootstrapped from local keyserver_device_keys
so clients see every device as "changed" on next sync and re-verify
against the migrated e2e_device_keys_json, preventing stale-cache
device-key mismatches.
Related:
- Access tokens are copied alongside devices in Phase 1 so clients
keep their existing sessions (no re-login -> device_id continuity
preserved -> local Megolm store on each client stays valid).
"""
def main():
parser = argparse.ArgumentParser(
description="Migrate a Dendrite PostgreSQL database to Synapse.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=PHASE_HELP + """\
prerequisites:
1. Both Dendrite and Synapse databases must be PostgreSQL.
2. Synapse must be initialized BEFORE running this script - start Synapse
once against an empty database so it creates its schema (v93+), then
stop it before migrating.
3. The Dendrite database should not be actively written to during migration.
examples:
# Full migration (all phases):
%(prog)s \\
--dendrite-db "dbname=dendrite host=/run/postgresql" \\
--synapse-db "dbname=synapse host=/run/postgresql" \\
--server-name example.com
# With media file copying:
%(prog)s \\
--dendrite-db "dbname=dendrite host=/run/postgresql" \\
--synapse-db "dbname=synapse host=/run/postgresql" \\
--server-name example.com \\
--dendrite-media-path /var/lib/dendrite/media \\
--synapse-media-path /var/lib/synapse/media_store
# Re-run only media phase:
%(prog)s ... --phase 6
# Dry run (inspect without committing):
%(prog)s ... --dry-run -v
# Copy only media files (no database writes):
%(prog)s --media-only \\
--dendrite-db "dbname=dendrite host=/run/postgresql" \\
--server-name example.com \\
--dendrite-media-path /var/lib/dendrite/media \\
--synapse-media-path /var/lib/synapse/media_store
""",
)
parser.add_argument(
"--dendrite-db", required=True, metavar="DSN",
help="Dendrite PostgreSQL connection string. Accepts any format "
"supported by libpq: a URI like "
'"postgresql://user:pass@host/dbname" or a keyword string like '
'"dbname=dendrite host=/run/postgresql". '
"This database is opened read-only.",
)
parser.add_argument(
"--synapse-db", metavar="DSN",
help="Synapse PostgreSQL connection string (same formats as "
"--dendrite-db). The Synapse schema must already exist - run "
"Synapse once to create it. This database is written to.",
)
parser.add_argument(
"--server-name", metavar="NAME",
help="The Matrix server name (e.g., example.com). Must match the "
"server_name in both your Dendrite and Synapse configs. Used to "
"construct full user IDs (@user:NAME) and to filter local media.",
)
parser.add_argument(
"--dendrite-media-path", metavar="DIR",
help="Path to Dendrite's media_store directory (the directory "
"containing hash-based subdirectories). Required together with "
"--synapse-media-path to copy media files. Without both flags, "
"only database metadata is migrated and files must be copied "
"manually.",
)
parser.add_argument(
"--synapse-media-path", metavar="DIR",
help="Path to Synapse's media_store directory (where local_content/ "
"and local_thumbnails/ will be created). Required together with "
"--dendrite-media-path.",
)
parser.add_argument(
"--phase", default="1,2,3,4,5,6,7,8", metavar="N[,N...]",
help="Comma-separated list of phases to run, from 1-8 (default: all). "
"Phases must be run in order on first migration. Individual phases "
"can be re-run safely (idempotent via ON CONFLICT). "
"Example: --phase 1,2,3 or --phase 6",
)
parser.add_argument(
"--media-only", action="store_true",
help="Copy media files only, without touching any database. Reads "
"media metadata (file hashes and IDs) from the Dendrite database "
"to determine source/destination paths, but writes nothing to "
"either database. Requires --dendrite-media-path and "
"--synapse-media-path. --synapse-db is not needed in this mode.",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Run the migration without committing. All database changes are "
"rolled back at the end of each phase. Media files are not "
"copied. Useful for verifying the migration will succeed.",
)
parser.add_argument(
"--verbose", "-v", action="store_true",
help="Enable debug logging (shows per-event warnings, SQL details).",
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
if not args.server_name:
parser.error("--server-name is required")
if args.media_only:
if not args.dendrite_media_path or not args.synapse_media_path:
parser.error("--media-only requires both --dendrite-media-path and --synapse-media-path")
_run_media_only(args)
else:
if not args.synapse_db:
parser.error("--synapse-db is required (unless using --media-only)")
phases = [int(p.strip()) for p in args.phase.split(",")]
migrator = Migrator(
dendrite_dsn=args.dendrite_db,
synapse_dsn=args.synapse_db,
server_name=args.server_name,
dendrite_media_path=args.dendrite_media_path,
synapse_media_path=args.synapse_media_path,
dry_run=args.dry_run,
)
try:
migrator.run(phases)
finally:
migrator.close()
def _run_media_only(args):
"""Copy media files using Dendrite DB for path lookup, no Synapse DB needed."""
log.info("=== Media-only mode: copying files without database changes ===")
src_conn = psycopg2.connect(args.dendrite_db)
src_conn.set_session(readonly=True)
try:
with src_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(
"SELECT media_id, base64hash, content_type FROM mediaapi_media_repository "
"WHERE media_origin = %s",
(args.server_name,),
)
media = cur.fetchall()
cur.execute(
"SELECT t.media_id, m.base64hash, t.width, t.height, "
" t.resize_method, t.content_type "
"FROM mediaapi_thumbnail t "
"JOIN mediaapi_media_repository m "
" ON t.media_id = m.media_id AND t.media_origin = m.media_origin "
"WHERE t.media_origin = %s",
(args.server_name,),
)
thumbs = cur.fetchall()
log.info("Found %d media files and %d thumbnails to copy", len(media), len(thumbs))
# Reuse static path methods from Migrator
_dp = Migrator._dendrite_media_path
_sp = Migrator._synapse_content_path
_dt = Migrator._dendrite_thumb_path
_st = Migrator._synapse_thumb_path
copied, skipped, errors = 0, 0, 0
for m in media:
src = _dp(args.dendrite_media_path, m["base64hash"])
dst = _sp(args.synapse_media_path, m["media_id"])
if not src or not dst:
errors += 1
continue
if os.path.exists(dst):
skipped += 1
continue
if not os.path.exists(src):
log.warning("Source missing: %s (media_id=%s)", src, m["media_id"])
errors += 1
continue
if not args.dry_run:
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
copied += 1
log.info("Content files: %d copied, %d skipped (exist), %d errors", copied, skipped, errors)
t_copied, t_skipped, t_errors = 0, 0, 0
for t in thumbs:
src = _dt(args.dendrite_media_path, t["base64hash"], t["width"], t["height"], t["resize_method"])
dst = _st(args.synapse_media_path, t["media_id"], t["width"], t["height"], t["content_type"], t["resize_method"])
if not src or not dst:
t_errors += 1
continue
if os.path.exists(dst):
t_skipped += 1
continue
if not os.path.exists(src):
log.debug("Thumbnail missing: %s", src)
t_errors += 1
continue
if not args.dry_run:
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copy2(src, dst)
t_copied += 1
log.info("Thumbnails: %d copied, %d skipped (exist), %d errors", t_copied, t_skipped, t_errors)
finally:
src_conn.close()
if __name__ == "__main__":
main()