From c7cc128c2cfbf74d67abdd791938352e45b5912e Mon Sep 17 00:00:00 2001 From: Clark Tomlinson Date: Wed, 15 Apr 2026 11:09:27 -0400 Subject: [PATCH 1/5] Add realtime sync #69 --- src/auth/sync/SyncManager.tsx | 2 + src/core/zustand-helpers/actions.ts | 16 +- src/core/zustand-helpers/immer.ts | 21 ++ src/core/zustand-helpers/slices.ts | 14 +- src/games/Game.ts | 3 +- src/games/gamesSlice.ts | 1 + src/games/menu/GameMenu.tsx | 209 +++++++++--------- src/games/save/gameSaveSlice.ts | 4 + src/games/save/saveRemoteGame.ts | 11 +- src/games/store/gameFactoriesActions.ts | 4 +- src/games/store/gameRemoteActions.ts | 20 ++ src/games/sync/useRealtimeGameSync.ts | 280 ++++++++++++++++++++++++ 12 files changed, 467 insertions(+), 118 deletions(-) create mode 100644 src/games/sync/useRealtimeGameSync.ts diff --git a/src/auth/sync/SyncManager.tsx b/src/auth/sync/SyncManager.tsx index a3f30c54..9a42b62a 100644 --- a/src/auth/sync/SyncManager.tsx +++ b/src/auth/sync/SyncManager.tsx @@ -1,9 +1,11 @@ +import { useRealtimeGameSync } from '@/games/sync/useRealtimeGameSync'; import { useSyncLocalAndRemoteStore } from './useSyncLocalAndRemoteStore'; export interface ISyncManagerProps {} export function SyncManager(props: ISyncManagerProps) { useSyncLocalAndRemoteStore(); + useRealtimeGameSync(); return null; } diff --git a/src/core/zustand-helpers/actions.ts b/src/core/zustand-helpers/actions.ts index 35eb24bb..1be39744 100644 --- a/src/core/zustand-helpers/actions.ts +++ b/src/core/zustand-helpers/actions.ts @@ -1,6 +1,6 @@ -import { produce } from 'immer'; +import { produceWithPatches } from 'immer'; import type { RootState } from '@/core/zustand'; -import { ImmerActions } from './immer'; +import { emitPatches, ImmerActions } from './immer'; import type { Action } from './slices'; type InferActions = Actions extends [infer ActionGroup, ...infer Rest] @@ -49,11 +49,13 @@ export function withActions< for (const group of actions) { for (const [name, action] of Object.entries(group)) { state[name] = (...args: any[]) => { - set( - produce(prevState => - action(...args)(prevState, proxyGet(prevState)), - ), - ); + set(prevState => { + const [nextState, patches] = produceWithPatches(prevState, draft => + action(...args)(draft as any, proxyGet(draft as any)), + ); + if (patches.length > 0) emitPatches(patches); + return nextState; + }); }; (state[name] as any)[ImmerActions] = (state: State, ...args: any[]) => { action(...args)(state, get); diff --git a/src/core/zustand-helpers/immer.ts b/src/core/zustand-helpers/immer.ts index e89893ff..81de9225 100644 --- a/src/core/zustand-helpers/immer.ts +++ b/src/core/zustand-helpers/immer.ts @@ -1 +1,22 @@ +import { enablePatches, type Patch } from 'immer'; + +enablePatches(); + export const ImmerActions = '__immerActions'; + +export type PatchListener = (patches: Patch[]) => void; + +const listeners = new Set(); + +export function onPatches(listener: PatchListener): () => void { + listeners.add(listener); + return () => { + listeners.delete(listener); + }; +} + +export function emitPatches(patches: Patch[]): void { + for (const listener of listeners) { + listener(patches); + } +} diff --git a/src/core/zustand-helpers/slices.ts b/src/core/zustand-helpers/slices.ts index 5245619e..01e822a8 100644 --- a/src/core/zustand-helpers/slices.ts +++ b/src/core/zustand-helpers/slices.ts @@ -1,5 +1,5 @@ -import { produce, type WritableDraft } from 'immer'; -import { ImmerActions } from './immer'; +import { produceWithPatches, type WritableDraft } from 'immer'; +import { emitPatches, ImmerActions } from './immer'; type InferState = Slices extends [ SliceConfig, @@ -30,9 +30,13 @@ export function withSlices< for (const [name, action] of Object.entries(slice.actions)) { state[name] = (...args: any[]) => { - set( - produce(prevState => action(...args)(prevState[slice.name], get)), - ); + set(prevState => { + const [nextState, patches] = produceWithPatches(prevState, draft => + action(...args)(draft[slice.name], get), + ); + if (patches.length > 0) emitPatches(patches); + return nextState; + }); }; (state[name] as any)[ImmerActions] = (state: any, ...args: any[]) => { action(...args)(state[slice.name], get); diff --git a/src/games/Game.ts b/src/games/Game.ts index 0c32ecc8..698963e1 100644 --- a/src/games/Game.ts +++ b/src/games/Game.ts @@ -17,11 +17,12 @@ export interface Game { shareToken?: string | null; authorId?: string; createdAt?: string; + updatedAt?: string; } export type GameRemoteData = Pick< Tables<'games'>, - 'author_id' | 'created_at' | 'id' | 'share_token' + 'author_id' | 'created_at' | 'id' | 'share_token' | 'updated_at' >; export interface GameSettings { diff --git a/src/games/gamesSlice.ts b/src/games/gamesSlice.ts index a9627ec4..c4dd64b9 100644 --- a/src/games/gamesSlice.ts +++ b/src/games/gamesSlice.ts @@ -116,6 +116,7 @@ export const gamesSlice = createSlice({ state.games[gameId].createdAt = data.created_at; state.games[gameId].savedId = data.id; state.games[gameId].shareToken = data.share_token; + state.games[gameId].updatedAt = data.updated_at; }, removeGameShareToken: (gameId: string) => state => { state.games[gameId].shareToken = undefined; diff --git a/src/games/menu/GameMenu.tsx b/src/games/menu/GameMenu.tsx index 83c7050b..6e2e3ed2 100644 --- a/src/games/menu/GameMenu.tsx +++ b/src/games/menu/GameMenu.tsx @@ -1,4 +1,4 @@ -import { Button, Menu } from '@mantine/core'; +import { Box, Button, Menu } from '@mantine/core'; import { useDisclosure } from '@mantine/hooks'; import { notifications } from '@mantine/notifications'; import { @@ -55,6 +55,9 @@ export function GameMenu(props: IGameMenuProps) { state => !!state.games.games[selectedId ?? '']?.savedId, ); const isSaving = useStore(state => state.gameSave.isSaving); + const isSyncConnected = + useStore(state => state.gameSave.isRealtimeSyncConnected) || + import.meta.env.DEV; const navigate = useNavigate(); const [opened, { toggle, open, close }] = useDisclosure(); @@ -93,113 +96,121 @@ export function GameMenu(props: IGameMenuProps) { return ( <> - - - - - - - Change game - {gameOptions.map(option => ( - + + + + + + + Change game + {gameOptions.map(option => ( + } + onClick={() => { + useStore.getState().selectGame(option.value); + navigate(`/factories`); + }} + rightSection={ + selectedId === option.value && ( + + ) + } + > + {option.label} + + ))} + + { - useStore.getState().selectGame(option.value); - navigate(`/factories`); + useStore.getState().createGame(v4(), { + name: + 'New Game ' + + (Object.keys(useStore.getState().games.games).length + 1), + }); }} - rightSection={ - selectedId === option.value && ( - - ) + leftSection={} + > + New game + + + Game actions + } + onClick={() => { + open(); + }} > - {option.label} + Rename game - ))} - - { - useStore.getState().createGame(v4(), { - name: - 'New Game ' + - (Object.keys(useStore.getState().games.games).length + 1), - }); - }} - leftSection={} - > - New game - - - Game actions - - } - onClick={() => { - open(); - }} - > - Rename game - - - } - onClick={openGameSettingsModal} - > - Game settings - - } - onClick={() => handleSaveGame(selectedId)} - > - Save game - - {selectedId && isSelectedSavedOnRemote && ( } - onClick={() => handleLoadGame(selectedId)} + leftSection={ + + } + onClick={openGameSettingsModal} + > + Game settings + + } + onClick={() => handleSaveGame(selectedId)} + > + Save game + + {selectedId && isSelectedSavedOnRemote && ( + } + onClick={() => handleLoadGame(selectedId)} + > + Load last save + + )} + + } + onClick={() => { + navigate(`/games`); + }} > - Load last save + Games list - )} - - } - onClick={() => { - navigate(`/games`); - }} - > - Games list - - - - - + + + + + {isSyncConnected && ( + + )} + {selectedId && ( )} diff --git a/src/games/save/gameSaveSlice.ts b/src/games/save/gameSaveSlice.ts index 01dae09c..fa4c7938 100644 --- a/src/games/save/gameSaveSlice.ts +++ b/src/games/save/gameSaveSlice.ts @@ -6,6 +6,7 @@ export const gameSaveSlice = createSlice({ hasRehydratedLocalData: false, isSaving: false, isLoading: false, + isRealtimeSyncConnected: false, }, actions: { setIsSaving: (isSaving: boolean) => state => { @@ -17,5 +18,8 @@ export const gameSaveSlice = createSlice({ setHasRehydratedLocalData: (hasRehydratedLocalData: boolean) => state => { state.hasRehydratedLocalData = hasRehydratedLocalData; }, + setRealtimeSyncConnected: (isRealtimeSyncConnected: boolean) => state => { + state.isRealtimeSyncConnected = isRealtimeSyncConnected; + }, }, }); diff --git a/src/games/save/saveRemoteGame.ts b/src/games/save/saveRemoteGame.ts index 415b27fa..d59dc768 100644 --- a/src/games/save/saveRemoteGame.ts +++ b/src/games/save/saveRemoteGame.ts @@ -4,9 +4,12 @@ import { supabaseClient } from '@/core/supabase'; import { useStore } from '@/core/zustand'; import { serializeGame } from '@/games/store/gameFactoriesActions'; -export async function saveRemoteGame(gameId?: string | null) { +export async function saveRemoteGame( + gameId?: string | null, + options?: { silent?: boolean }, +) { const { auth } = useStore.getState(); - useStore.getState().setIsSaving(true); + if (!options?.silent) useStore.getState().setIsSaving(true); try { if (!auth.session) { console.log( @@ -41,7 +44,7 @@ export async function saveRemoteGame(gameId?: string | null) { data: serializeGame(gameId) as unknown as Json, updated_at: new Date().toISOString(), }) - .select('id, author_id, created_at, share_token') + .select('id, author_id, created_at, updated_at, share_token') .single(); if (error) { @@ -58,6 +61,6 @@ export async function saveRemoteGame(gameId?: string | null) { message: error?.message ?? error ?? 'Unknown error', }); } finally { - useStore.getState().setIsSaving(false); + if (!options?.silent) useStore.getState().setIsSaving(false); } } diff --git a/src/games/store/gameFactoriesActions.ts b/src/games/store/gameFactoriesActions.ts index cb6eda51..84c65057 100644 --- a/src/games/store/gameFactoriesActions.ts +++ b/src/games/store/gameFactoriesActions.ts @@ -126,7 +126,7 @@ export const gameFactoriesActions = createActions({ }); export type SerializedGame = { - game: Omit; + game: Omit; factories: Factory[]; solvers: SolverInstance[]; }; @@ -140,7 +140,7 @@ export function serializeGame( throw new Error('Game not found'); } return { - game: omit(game, ['createdAt', 'authorId', 'savedId']), + game: omit(game, ['createdAt', 'updatedAt', 'authorId', 'savedId']), factories: game?.factoriesIds.map( factoryId => state.factories.factories[factoryId], ), diff --git a/src/games/store/gameRemoteActions.ts b/src/games/store/gameRemoteActions.ts index b178cfda..08a2f5b1 100644 --- a/src/games/store/gameRemoteActions.ts +++ b/src/games/store/gameRemoteActions.ts @@ -54,6 +54,9 @@ function loadSerializedGameIntoState( state.games.games[serialized.game.id].createdAt = data.created_at; state.games.games[serialized.game.id].savedId = data.id; state.games.games[serialized.game.id].shareToken = data.share_token; + if (data.updated_at) { + state.games.games[serialized.game.id].updatedAt = data.updated_at; + } return; } @@ -64,11 +67,28 @@ function loadSerializedGameIntoState( `Fully loaded game "${serialized.game.name}" (id=${serialized.game.id})`, serialized, ); // prettier-ignore + + if (options.override) { + const existingGame = state.games.games[serialized.game.id]; + if (existingGame) { + const incomingFactoryIds = new Set(serialized.game.factoriesIds); + for (const oldFactoryId of existingGame.factoriesIds) { + if (!incomingFactoryIds.has(oldFactoryId)) { + delete state.factories.factories[oldFactoryId]; + delete state.solvers.instances[oldFactoryId]; + } + } + } + } + state.games.games[serialized.game.id] = { ...serialized.game }; state.games.games[serialized.game.id].authorId = data.author_id; state.games.games[serialized.game.id].createdAt = data.created_at; state.games.games[serialized.game.id].savedId = data.id; state.games.games[serialized.game.id].shareToken = data.share_token; + if (data.updated_at) { + state.games.games[serialized.game.id].updatedAt = data.updated_at; + } serialized.factories.forEach(factory => { state.factories.factories[factory.id] = factory; diff --git a/src/games/sync/useRealtimeGameSync.ts b/src/games/sync/useRealtimeGameSync.ts new file mode 100644 index 00000000..f277fe77 --- /dev/null +++ b/src/games/sync/useRealtimeGameSync.ts @@ -0,0 +1,280 @@ +import type { RealtimeChannel } from '@supabase/supabase-js'; +import { applyPatches, type Patch } from 'immer'; +import { useEffect, useRef } from 'react'; +import { loglev } from '@/core/logger/log'; +import { supabaseClient } from '@/core/supabase'; +import { useStore } from '@/core/zustand'; +import { onPatches } from '@/core/zustand-helpers/immer'; +import type { GameRemoteData } from '@/games/Game'; +import { loadRemoteGame } from '@/games/save/loadRemoteGame'; +import { saveRemoteGame } from '@/games/save/saveRemoteGame'; +import { + type SerializedGame, + serializeGame, +} from '@/games/store/gameFactoriesActions'; + +const logger = loglev.getLogger('games:realtime-sync'); + +const SENDER_ID = crypto.randomUUID(); +const PATCH_DEBOUNCE_MS = 150; +const AUTO_SAVE_DEBOUNCE_MS = 5_000; +const DB_FALLBACK_MS = 3_000; +const BROADCAST_EVENT = 'game:patch'; +const BROADCAST_FULL_REQUEST = 'game:full-request'; +const BROADCAST_FULL_RESPONSE = 'game:full-response'; + +const GAME_SLICES = new Set(['games', 'factories', 'solvers']); + +function isGamePatch(patch: Patch): boolean { + const { path } = patch; + return typeof path[0] === 'string' && GAME_SLICES.has(path[0]); +} + +interface PatchBroadcastPayload { + senderId: string; + seq: number; + patches: Patch[]; +} + +interface FullStateRequestPayload { + senderId: string; +} + +interface FullStateResponsePayload { + senderId: string; + seq: number; + serialized: SerializedGame; + remoteData: Partial; +} + +export function useRealtimeGameSync() { + const session = useStore(s => s.auth.session); + const selectedGameId = useStore(s => s.games.selected); + const game = useStore(s => + selectedGameId ? s.games.games[selectedGameId] : null, + ); + const savedId = game?.savedId; + + const channelRef = useRef(null); + const isApplyingRemoteRef = useRef(false); + const seqRef = useRef(0); + + useEffect(() => { + if (!session || !savedId || !selectedGameId) { + if (channelRef.current) { + logger.info('Leaving realtime channel (preconditions lost)'); + supabaseClient.removeChannel(channelRef.current); + channelRef.current = null; + } + return; + } + + const channelName = `game:${savedId}`; + logger.info(`Joining realtime channel: ${channelName}`); + + const channel = supabaseClient.channel(channelName); + const gameId = selectedGameId; + let remoteSeq = -1; + let dbFallbackTimer: ReturnType | null = null; + + channel + .on('broadcast', { event: BROADCAST_EVENT }, ({ payload }) => { + const data = payload as PatchBroadcastPayload; + if (data.senderId === SENDER_ID) return; + + if (data.seq <= remoteSeq) { + logger.debug( + `Ignoring out-of-order patch (seq=${data.seq}, expected>${remoteSeq})`, + ); + return; + } + + if (remoteSeq >= 0 && data.seq !== remoteSeq + 1) { + logger.info( + `Missed patches (got seq=${data.seq}, expected=${remoteSeq + 1}), requesting full state`, + ); + channel.send({ + type: 'broadcast', + event: BROADCAST_FULL_REQUEST, + payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, + }); + remoteSeq = data.seq; + return; + } + + remoteSeq = data.seq; + logger.debug( + `Applying ${data.patches.length} remote patches (seq=${data.seq})`, + ); + isApplyingRemoteRef.current = true; + try { + const currentState = useStore.getState(); + const nextState = applyPatches(currentState, data.patches); + useStore.setState(nextState); + } catch (err) { + logger.error('Failed to apply patches, requesting full state', err); + channel.send({ + type: 'broadcast', + event: BROADCAST_FULL_REQUEST, + payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, + }); + } finally { + isApplyingRemoteRef.current = false; + } + }) + .on('broadcast', { event: BROADCAST_FULL_REQUEST }, ({ payload }) => { + const data = payload as FullStateRequestPayload; + if (data.senderId === SENDER_ID) return; + + logger.info('Peer requested full state, sending'); + try { + const latestGame = useStore.getState().games.games[gameId]; + if (!latestGame?.savedId) return; + + const serialized = serializeGame(gameId); + const remoteData: Partial = { + id: latestGame.savedId, + author_id: latestGame.authorId, + created_at: latestGame.createdAt, + updated_at: latestGame.updatedAt, + share_token: latestGame.shareToken, + }; + + channel.send({ + type: 'broadcast', + event: BROADCAST_FULL_RESPONSE, + payload: { + senderId: SENDER_ID, + seq: seqRef.current, + serialized, + remoteData, + } satisfies FullStateResponsePayload, + }); + } catch (err) { + logger.error('Failed to send full state response', err); + } + }) + .on('broadcast', { event: BROADCAST_FULL_RESPONSE }, ({ payload }) => { + const data = payload as FullStateResponsePayload; + if (data.senderId === SENDER_ID) return; + + if (dbFallbackTimer !== null) { + clearTimeout(dbFallbackTimer); + dbFallbackTimer = null; + } + + logger.info(`Received full state response (seq=${data.seq}), applying`); + remoteSeq = data.seq; + isApplyingRemoteRef.current = true; + try { + useStore.getState().loadRemoteGame(data.serialized, data.remoteData, { + override: true, + }); + } finally { + isApplyingRemoteRef.current = false; + } + }) + .subscribe(status => { + logger.info(`Realtime channel status: ${status}`); + useStore.getState().setRealtimeSyncConnected(status === 'SUBSCRIBED'); + + if (status === 'SUBSCRIBED') { + channel.send({ + type: 'broadcast', + event: BROADCAST_FULL_REQUEST, + payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, + }); + + dbFallbackTimer = setTimeout(() => { + dbFallbackTimer = null; + logger.info('No peer response, loading from database'); + isApplyingRemoteRef.current = true; + loadRemoteGame(gameId, { override: true }).finally(() => { + isApplyingRemoteRef.current = false; + }); + }, DB_FALLBACK_MS); + } + }); + + channelRef.current = channel; + + let pendingPatches: Patch[] = []; + let flushTimer: ReturnType | null = null; + let autoSaveTimer: ReturnType | null = null; + + function scheduleAutoSave() { + if (autoSaveTimer !== null) clearTimeout(autoSaveTimer); + autoSaveTimer = setTimeout(() => { + autoSaveTimer = null; + saveRemoteGame(gameId, { silent: true }).catch(err => + logger.error('Auto-save failed', err), + ); + }, AUTO_SAVE_DEBOUNCE_MS); + } + + function flushPatches() { + flushTimer = null; + if (!channelRef.current || pendingPatches.length === 0) return; + + seqRef.current += 1; + const seq = seqRef.current; + const batch = pendingPatches; + pendingPatches = []; + + try { + channelRef.current.send({ + type: 'broadcast', + event: BROADCAST_EVENT, + payload: { + senderId: SENDER_ID, + seq, + patches: batch, + } satisfies PatchBroadcastPayload, + }); + logger.debug(`Broadcasted ${batch.length} patches (seq=${seq})`); + } catch (err) { + logger.error('Failed to broadcast patches', err); + } + } + + const unsubscribePatches = onPatches(patches => { + if (isApplyingRemoteRef.current) return; + if (!channelRef.current) return; + + const gamePatches = patches.filter(isGamePatch); + if (gamePatches.length === 0) return; + + pendingPatches.push(...gamePatches); + scheduleAutoSave(); + + if (flushTimer !== null) clearTimeout(flushTimer); + flushTimer = setTimeout(flushPatches, PATCH_DEBOUNCE_MS); + }); + + return () => { + unsubscribePatches(); + if (flushTimer !== null) { + clearTimeout(flushTimer); + flushPatches(); + } + if (autoSaveTimer !== null) { + clearTimeout(autoSaveTimer); + saveRemoteGame(gameId, { silent: true }).catch(err => + logger.error('Auto-save on cleanup failed', err), + ); + } + if (dbFallbackTimer !== null) { + clearTimeout(dbFallbackTimer); + dbFallbackTimer = null; + } + + if (channelRef.current) { + logger.info(`Leaving realtime channel: ${channelName}`); + supabaseClient.removeChannel(channelRef.current); + channelRef.current = null; + } + + useStore.getState().setRealtimeSyncConnected(false); + }; + }, [session, savedId, selectedGameId]); +} From baa8c2dcd64b8801949b677b0a13d4d6daf40b5d Mon Sep 17 00:00:00 2001 From: Clark Tomlinson Date: Wed, 15 Apr 2026 13:16:23 -0400 Subject: [PATCH 2/5] Address CR feedback --- src/core/zustand-helpers/actions.ts | 4 +- src/core/zustand-helpers/immer.ts | 4 +- src/core/zustand-helpers/slices.ts | 4 +- src/games/sync/useRealtimeGameSync.ts | 83 +++++++++++++++------------ 4 files changed, 53 insertions(+), 42 deletions(-) diff --git a/src/core/zustand-helpers/actions.ts b/src/core/zustand-helpers/actions.ts index 1be39744..ca30b138 100644 --- a/src/core/zustand-helpers/actions.ts +++ b/src/core/zustand-helpers/actions.ts @@ -1,6 +1,6 @@ import { produceWithPatches } from 'immer'; import type { RootState } from '@/core/zustand'; -import { emitPatches, ImmerActions } from './immer'; +import { emitStorePatches, ImmerActions } from './immer'; import type { Action } from './slices'; type InferActions = Actions extends [infer ActionGroup, ...infer Rest] @@ -53,7 +53,7 @@ export function withActions< const [nextState, patches] = produceWithPatches(prevState, draft => action(...args)(draft as any, proxyGet(draft as any)), ); - if (patches.length > 0) emitPatches(patches); + if (patches.length > 0) emitStorePatches(patches); return nextState; }); }; diff --git a/src/core/zustand-helpers/immer.ts b/src/core/zustand-helpers/immer.ts index 81de9225..1c1a50ee 100644 --- a/src/core/zustand-helpers/immer.ts +++ b/src/core/zustand-helpers/immer.ts @@ -8,14 +8,14 @@ export type PatchListener = (patches: Patch[]) => void; const listeners = new Set(); -export function onPatches(listener: PatchListener): () => void { +export function onStorePatches(listener: PatchListener): () => void { listeners.add(listener); return () => { listeners.delete(listener); }; } -export function emitPatches(patches: Patch[]): void { +export function emitStorePatches(patches: Patch[]): void { for (const listener of listeners) { listener(patches); } diff --git a/src/core/zustand-helpers/slices.ts b/src/core/zustand-helpers/slices.ts index 01e822a8..f808a7b1 100644 --- a/src/core/zustand-helpers/slices.ts +++ b/src/core/zustand-helpers/slices.ts @@ -1,5 +1,5 @@ import { produceWithPatches, type WritableDraft } from 'immer'; -import { emitPatches, ImmerActions } from './immer'; +import { emitStorePatches, ImmerActions } from './immer'; type InferState = Slices extends [ SliceConfig, @@ -34,7 +34,7 @@ export function withSlices< const [nextState, patches] = produceWithPatches(prevState, draft => action(...args)(draft[slice.name], get), ); - if (patches.length > 0) emitPatches(patches); + if (patches.length > 0) emitStorePatches(patches); return nextState; }); }; diff --git a/src/games/sync/useRealtimeGameSync.ts b/src/games/sync/useRealtimeGameSync.ts index f277fe77..0646b603 100644 --- a/src/games/sync/useRealtimeGameSync.ts +++ b/src/games/sync/useRealtimeGameSync.ts @@ -4,7 +4,7 @@ import { useEffect, useRef } from 'react'; import { loglev } from '@/core/logger/log'; import { supabaseClient } from '@/core/supabase'; import { useStore } from '@/core/zustand'; -import { onPatches } from '@/core/zustand-helpers/immer'; +import { onStorePatches } from '@/core/zustand-helpers/immer'; import type { GameRemoteData } from '@/games/Game'; import { loadRemoteGame } from '@/games/save/loadRemoteGame'; import { saveRemoteGame } from '@/games/save/saveRemoteGame'; @@ -17,7 +17,7 @@ const logger = loglev.getLogger('games:realtime-sync'); const SENDER_ID = crypto.randomUUID(); const PATCH_DEBOUNCE_MS = 150; -const AUTO_SAVE_DEBOUNCE_MS = 5_000; +const AUTO_SAVE_DEBOUNCE_MS = 60_000; const DB_FALLBACK_MS = 3_000; const BROADCAST_EVENT = 'game:patch'; const BROADCAST_FULL_REQUEST = 'game:full-request'; @@ -25,9 +25,14 @@ const BROADCAST_FULL_RESPONSE = 'game:full-response'; const GAME_SLICES = new Set(['games', 'factories', 'solvers']); +const IGNORED_GAME_PATHS = new Set(['selected']); + function isGamePatch(patch: Patch): boolean { const { path } = patch; - return typeof path[0] === 'string' && GAME_SLICES.has(path[0]); + if (typeof path[0] !== 'string' || !GAME_SLICES.has(path[0])) return false; + if (path[0] === 'games' && IGNORED_GAME_PATHS.has(path[1] as string)) + return false; + return true; } interface PatchBroadcastPayload { @@ -74,35 +79,58 @@ export function useRealtimeGameSync() { const channel = supabaseClient.channel(channelName); const gameId = selectedGameId; - let remoteSeq = -1; + const remoteSeqs = new Map(); let dbFallbackTimer: ReturnType | null = null; + function requestFullStateWithFallback() { + channel.send({ + type: 'broadcast', + event: BROADCAST_FULL_REQUEST, + payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, + }); + + if (dbFallbackTimer !== null) clearTimeout(dbFallbackTimer); + dbFallbackTimer = setTimeout(async () => { + dbFallbackTimer = null; + logger.info( + 'No peer response, saving local state then loading from database', + ); + try { + await saveRemoteGame(gameId, { silent: true }); + } catch (err) { + logger.error('Pre-fallback save failed', err); + } + isApplyingRemoteRef.current = true; + loadRemoteGame(gameId, { override: true }).finally(() => { + isApplyingRemoteRef.current = false; + }); + }, DB_FALLBACK_MS); + } + channel .on('broadcast', { event: BROADCAST_EVENT }, ({ payload }) => { const data = payload as PatchBroadcastPayload; if (data.senderId === SENDER_ID) return; - if (data.seq <= remoteSeq) { + const lastSeq = remoteSeqs.get(data.senderId) ?? -1; + + if (data.seq <= lastSeq) { logger.debug( - `Ignoring out-of-order patch (seq=${data.seq}, expected>${remoteSeq})`, + `Ignoring out-of-order patch from ${data.senderId} (seq=${data.seq}, expected>${lastSeq})`, ); return; } - if (remoteSeq >= 0 && data.seq !== remoteSeq + 1) { + if (lastSeq >= 0 && data.seq !== lastSeq + 1) { logger.info( - `Missed patches (got seq=${data.seq}, expected=${remoteSeq + 1}), requesting full state`, + `Missed patches from ${data.senderId} (got seq=${data.seq}, expected=${lastSeq + 1}), requesting full state`, ); - channel.send({ - type: 'broadcast', - event: BROADCAST_FULL_REQUEST, - payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, - }); - remoteSeq = data.seq; + requestFullStateWithFallback(); + remoteSeqs.set(data.senderId, data.seq); return; } - remoteSeq = data.seq; + remoteSeqs.set(data.senderId, data.seq); logger.debug( `Applying ${data.patches.length} remote patches (seq=${data.seq})`, ); @@ -113,11 +141,7 @@ export function useRealtimeGameSync() { useStore.setState(nextState); } catch (err) { logger.error('Failed to apply patches, requesting full state', err); - channel.send({ - type: 'broadcast', - event: BROADCAST_FULL_REQUEST, - payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, - }); + requestFullStateWithFallback(); } finally { isApplyingRemoteRef.current = false; } @@ -164,7 +188,7 @@ export function useRealtimeGameSync() { } logger.info(`Received full state response (seq=${data.seq}), applying`); - remoteSeq = data.seq; + remoteSeqs.set(data.senderId, data.seq); isApplyingRemoteRef.current = true; try { useStore.getState().loadRemoteGame(data.serialized, data.remoteData, { @@ -179,20 +203,7 @@ export function useRealtimeGameSync() { useStore.getState().setRealtimeSyncConnected(status === 'SUBSCRIBED'); if (status === 'SUBSCRIBED') { - channel.send({ - type: 'broadcast', - event: BROADCAST_FULL_REQUEST, - payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, - }); - - dbFallbackTimer = setTimeout(() => { - dbFallbackTimer = null; - logger.info('No peer response, loading from database'); - isApplyingRemoteRef.current = true; - loadRemoteGame(gameId, { override: true }).finally(() => { - isApplyingRemoteRef.current = false; - }); - }, DB_FALLBACK_MS); + requestFullStateWithFallback(); } }); @@ -237,7 +248,7 @@ export function useRealtimeGameSync() { } } - const unsubscribePatches = onPatches(patches => { + const unsubscribePatches = onStorePatches(patches => { if (isApplyingRemoteRef.current) return; if (!channelRef.current) return; From 5c61cdd01dba3f4e866b726fc0ee06cefcee9885 Mon Sep 17 00:00:00 2001 From: Clark Tomlinson Date: Wed, 15 Apr 2026 13:32:15 -0400 Subject: [PATCH 3/5] resolve strange save/load behavior and implement a leader election for db saves. Resolves: #86 --- src/games/sync/useRealtimeGameSync.ts | 66 +++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 10 deletions(-) diff --git a/src/games/sync/useRealtimeGameSync.ts b/src/games/sync/useRealtimeGameSync.ts index 0646b603..017237e4 100644 --- a/src/games/sync/useRealtimeGameSync.ts +++ b/src/games/sync/useRealtimeGameSync.ts @@ -63,6 +63,7 @@ export function useRealtimeGameSync() { const channelRef = useRef(null); const isApplyingRemoteRef = useRef(false); const seqRef = useRef(0); + const isLeaderRef = useRef(false); useEffect(() => { if (!session || !savedId || !selectedGameId) { @@ -92,18 +93,41 @@ export function useRealtimeGameSync() { if (dbFallbackTimer !== null) clearTimeout(dbFallbackTimer); dbFallbackTimer = setTimeout(async () => { dbFallbackTimer = null; - logger.info( - 'No peer response, saving local state then loading from database', - ); + logger.info('No peer response, reconciling with database'); try { - await saveRemoteGame(gameId, { silent: true }); + const localGame = useStore.getState().games.games[gameId]; + const savedId = localGame?.savedId; + if (!savedId) return; + + const { data, error } = await supabaseClient + .from('games') + .select('updated_at') + .eq('id', savedId) + .single(); + + if (error) throw error; + + const dbTime = data?.updated_at + ? new Date(data.updated_at).getTime() + : 0; + const localTime = localGame.updatedAt + ? new Date(localGame.updatedAt).getTime() + : 0; + + if (dbTime > localTime) { + logger.info('DB is newer, loading remote state'); + isApplyingRemoteRef.current = true; + await loadRemoteGame(gameId, { override: true }); + isApplyingRemoteRef.current = false; + } else if (isLeaderRef.current) { + logger.info('Local is newer or equal, saving to DB (leader)'); + await saveRemoteGame(gameId, { silent: true }); + } else { + logger.info('Local is newer or equal, skipping save (not leader)'); + } } catch (err) { - logger.error('Pre-fallback save failed', err); + logger.error('DB fallback reconciliation failed', err); } - isApplyingRemoteRef.current = true; - loadRemoteGame(gameId, { override: true }).finally(() => { - isApplyingRemoteRef.current = false; - }); }, DB_FALLBACK_MS); } @@ -198,11 +222,31 @@ export function useRealtimeGameSync() { isApplyingRemoteRef.current = false; } }) - .subscribe(status => { + .on('presence', { event: 'sync' }, () => { + const state = channel.presenceState<{ senderId: string }>(); + const senderIds: string[] = []; + for (const presences of Object.values(state)) { + for (const p of presences) { + if (p.senderId) senderIds.push(p.senderId); + } + } + senderIds.sort(); + const wasLeader = isLeaderRef.current; + isLeaderRef.current = senderIds[0] === SENDER_ID; + if (isLeaderRef.current !== wasLeader) { + logger.info( + isLeaderRef.current + ? `Elected as leader (${senderIds.length} peers)` + : `No longer leader (${senderIds.length} peers)`, + ); + } + }) + .subscribe(async status => { logger.info(`Realtime channel status: ${status}`); useStore.getState().setRealtimeSyncConnected(status === 'SUBSCRIBED'); if (status === 'SUBSCRIBED') { + await channel.track({ senderId: SENDER_ID }); requestFullStateWithFallback(); } }); @@ -214,9 +258,11 @@ export function useRealtimeGameSync() { let autoSaveTimer: ReturnType | null = null; function scheduleAutoSave() { + if (!isLeaderRef.current) return; if (autoSaveTimer !== null) clearTimeout(autoSaveTimer); autoSaveTimer = setTimeout(() => { autoSaveTimer = null; + if (!isLeaderRef.current) return; saveRemoteGame(gameId, { silent: true }).catch(err => logger.error('Auto-save failed', err), ); From 657bd6d4719b52fdda2fc986584015c8f098c1fe Mon Sep 17 00:00:00 2001 From: Clark Tomlinson Date: Wed, 15 Apr 2026 13:45:19 -0400 Subject: [PATCH 4/5] breakout sync logic into smaller funcs --- src/games/sync/realtimeSyncHandlers.ts | 211 ++++++++++++++++++ src/games/sync/realtimeSyncTypes.ts | 39 ++++ src/games/sync/useRealtimeGameSync.ts | 290 +++++++------------------ 3 files changed, 323 insertions(+), 217 deletions(-) create mode 100644 src/games/sync/realtimeSyncHandlers.ts create mode 100644 src/games/sync/realtimeSyncTypes.ts diff --git a/src/games/sync/realtimeSyncHandlers.ts b/src/games/sync/realtimeSyncHandlers.ts new file mode 100644 index 00000000..04deda7f --- /dev/null +++ b/src/games/sync/realtimeSyncHandlers.ts @@ -0,0 +1,211 @@ +import type { RealtimeChannel } from '@supabase/supabase-js'; +import { applyPatches, type Patch } from 'immer'; +import { loglev } from '@/core/logger/log'; +import { supabaseClient } from '@/core/supabase'; +import { useStore } from '@/core/zustand'; +import type { GameRemoteData } from '@/games/Game'; +import { loadRemoteGame } from '@/games/save/loadRemoteGame'; +import { saveRemoteGame } from '@/games/save/saveRemoteGame'; +import { serializeGame } from '@/games/store/gameFactoriesActions'; +import { + BROADCAST_FULL_REQUEST, + BROADCAST_FULL_RESPONSE, + DB_FALLBACK_MS, + type FullStateRequestPayload, + type FullStateResponsePayload, + type PatchBroadcastPayload, + SENDER_ID, +} from './realtimeSyncTypes'; + +const logger = loglev.getLogger('games:realtime-sync'); + +export interface SyncRefs { + isApplyingRemote: { current: boolean }; + isLeader: { current: boolean }; + seq: { current: number }; +} + +export interface SyncTimers { + dbFallback: ReturnType | null; +} + +export function handleIncomingPatches( + data: PatchBroadcastPayload, + remoteSeqs: Map, + refs: SyncRefs, + requestFullState: () => void, +) { + if (data.senderId === SENDER_ID) return; + + const lastSeq = remoteSeqs.get(data.senderId) ?? -1; + + if (data.seq <= lastSeq) { + logger.debug( + `Ignoring out-of-order patch from ${data.senderId} (seq=${data.seq}, expected>${lastSeq})`, + ); + return; + } + + if (lastSeq >= 0 && data.seq !== lastSeq + 1) { + logger.info( + `Missed patches from ${data.senderId} (got seq=${data.seq}, expected=${lastSeq + 1}), requesting full state`, + ); + requestFullState(); + remoteSeqs.set(data.senderId, data.seq); + return; + } + + remoteSeqs.set(data.senderId, data.seq); + logger.debug( + `Applying ${data.patches.length} remote patches (seq=${data.seq})`, + ); + refs.isApplyingRemote.current = true; + try { + const currentState = useStore.getState(); + const nextState = applyPatches(currentState, data.patches); + useStore.setState(nextState); + } catch (err) { + logger.error('Failed to apply patches, requesting full state', err); + requestFullState(); + } finally { + refs.isApplyingRemote.current = false; + } +} + +export function handleFullStateRequest( + data: FullStateRequestPayload, + channel: RealtimeChannel, + gameId: string, + refs: SyncRefs, +) { + if (data.senderId === SENDER_ID) return; + + logger.info('Peer requested full state, sending'); + try { + const latestGame = useStore.getState().games.games[gameId]; + if (!latestGame?.savedId) return; + + const serialized = serializeGame(gameId); + const remoteData: Partial = { + id: latestGame.savedId, + author_id: latestGame.authorId, + created_at: latestGame.createdAt, + updated_at: latestGame.updatedAt, + share_token: latestGame.shareToken, + }; + + channel.send({ + type: 'broadcast', + event: BROADCAST_FULL_RESPONSE, + payload: { + senderId: SENDER_ID, + seq: refs.seq.current, + serialized, + remoteData, + } satisfies FullStateResponsePayload, + }); + } catch (err) { + logger.error('Failed to send full state response', err); + } +} + +export function handleFullStateResponse( + data: FullStateResponsePayload, + remoteSeqs: Map, + refs: SyncRefs, + timers: SyncTimers, +) { + if (data.senderId === SENDER_ID) return; + + if (timers.dbFallback !== null) { + clearTimeout(timers.dbFallback); + timers.dbFallback = null; + } + + logger.info(`Received full state response (seq=${data.seq}), applying`); + remoteSeqs.set(data.senderId, data.seq); + refs.isApplyingRemote.current = true; + try { + useStore.getState().loadRemoteGame(data.serialized, data.remoteData, { + override: true, + }); + } finally { + refs.isApplyingRemote.current = false; + } +} + +export function requestFullStateWithFallback( + channel: RealtimeChannel, + gameId: string, + refs: SyncRefs, + timers: SyncTimers, +) { + channel.send({ + type: 'broadcast', + event: BROADCAST_FULL_REQUEST, + payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, + }); + + if (timers.dbFallback !== null) clearTimeout(timers.dbFallback); + timers.dbFallback = setTimeout(async () => { + timers.dbFallback = null; + logger.info('No peer response, reconciling with database'); + try { + const localGame = useStore.getState().games.games[gameId]; + const savedId = localGame?.savedId; + if (!savedId) return; + + const { data, error } = await supabaseClient + .from('games') + .select('updated_at') + .eq('id', savedId) + .single(); + + if (error) throw error; + + const dbTime = data?.updated_at + ? new Date(data.updated_at).getTime() + : 0; + const localTime = localGame.updatedAt + ? new Date(localGame.updatedAt).getTime() + : 0; + + if (dbTime > localTime) { + logger.info('DB is newer, loading remote state'); + refs.isApplyingRemote.current = true; + await loadRemoteGame(gameId, { override: true }); + refs.isApplyingRemote.current = false; + } else if (refs.isLeader.current) { + logger.info('Local is newer or equal, saving to DB (leader)'); + await saveRemoteGame(gameId, { silent: true }); + } else { + logger.info('Local is newer or equal, skipping save (not leader)'); + } + } catch (err) { + logger.error('DB fallback reconciliation failed', err); + } + }, DB_FALLBACK_MS); +} + +export function computeLeader( + channel: RealtimeChannel, + refs: SyncRefs, +) { + const state = channel.presenceState<{ senderId: string }>(); + const senderIds: string[] = []; + for (const presences of Object.values(state)) { + for (const p of presences) { + if (p.senderId) senderIds.push(p.senderId); + } + } + senderIds.sort(); + const wasLeader = refs.isLeader.current; + refs.isLeader.current = senderIds[0] === SENDER_ID; + if (refs.isLeader.current !== wasLeader) { + logger.info( + refs.isLeader.current + ? `Elected as leader (${senderIds.length} peers)` + : `No longer leader (${senderIds.length} peers)`, + ); + } +} diff --git a/src/games/sync/realtimeSyncTypes.ts b/src/games/sync/realtimeSyncTypes.ts new file mode 100644 index 00000000..1f506471 --- /dev/null +++ b/src/games/sync/realtimeSyncTypes.ts @@ -0,0 +1,39 @@ +import type { Patch } from 'immer'; +import type { GameRemoteData } from '@/games/Game'; +import type { SerializedGame } from '@/games/store/gameFactoriesActions'; + +export const SENDER_ID = crypto.randomUUID(); +export const PATCH_DEBOUNCE_MS = 150; +export const AUTO_SAVE_DEBOUNCE_MS = 60_000; +export const DB_FALLBACK_MS = 3_000; +export const BROADCAST_EVENT = 'game:patch'; +export const BROADCAST_FULL_REQUEST = 'game:full-request'; +export const BROADCAST_FULL_RESPONSE = 'game:full-response'; + +export interface PatchBroadcastPayload { + senderId: string; + seq: number; + patches: Patch[]; +} + +export interface FullStateRequestPayload { + senderId: string; +} + +export interface FullStateResponsePayload { + senderId: string; + seq: number; + serialized: SerializedGame; + remoteData: Partial; +} + +const GAME_SLICES = new Set(['games', 'factories', 'solvers']); +const IGNORED_GAME_PATHS = new Set(['selected']); + +export function isGamePatch(patch: Patch): boolean { + const { path } = patch; + if (typeof path[0] !== 'string' || !GAME_SLICES.has(path[0])) return false; + if (path[0] === 'games' && IGNORED_GAME_PATHS.has(path[1] as string)) + return false; + return true; +} diff --git a/src/games/sync/useRealtimeGameSync.ts b/src/games/sync/useRealtimeGameSync.ts index 017237e4..2d2453ed 100644 --- a/src/games/sync/useRealtimeGameSync.ts +++ b/src/games/sync/useRealtimeGameSync.ts @@ -1,57 +1,35 @@ import type { RealtimeChannel } from '@supabase/supabase-js'; -import { applyPatches, type Patch } from 'immer'; +import type { Patch } from 'immer'; import { useEffect, useRef } from 'react'; import { loglev } from '@/core/logger/log'; import { supabaseClient } from '@/core/supabase'; import { useStore } from '@/core/zustand'; import { onStorePatches } from '@/core/zustand-helpers/immer'; -import type { GameRemoteData } from '@/games/Game'; -import { loadRemoteGame } from '@/games/save/loadRemoteGame'; import { saveRemoteGame } from '@/games/save/saveRemoteGame'; import { - type SerializedGame, - serializeGame, -} from '@/games/store/gameFactoriesActions'; + computeLeader, + handleFullStateRequest, + handleFullStateResponse, + handleIncomingPatches, + requestFullStateWithFallback, + type SyncRefs, + type SyncTimers, +} from './realtimeSyncHandlers'; +import { + AUTO_SAVE_DEBOUNCE_MS, + BROADCAST_EVENT, + BROADCAST_FULL_REQUEST, + BROADCAST_FULL_RESPONSE, + type FullStateRequestPayload, + type FullStateResponsePayload, + isGamePatch, + PATCH_DEBOUNCE_MS, + type PatchBroadcastPayload, + SENDER_ID, +} from './realtimeSyncTypes'; const logger = loglev.getLogger('games:realtime-sync'); -const SENDER_ID = crypto.randomUUID(); -const PATCH_DEBOUNCE_MS = 150; -const AUTO_SAVE_DEBOUNCE_MS = 60_000; -const DB_FALLBACK_MS = 3_000; -const BROADCAST_EVENT = 'game:patch'; -const BROADCAST_FULL_REQUEST = 'game:full-request'; -const BROADCAST_FULL_RESPONSE = 'game:full-response'; - -const GAME_SLICES = new Set(['games', 'factories', 'solvers']); - -const IGNORED_GAME_PATHS = new Set(['selected']); - -function isGamePatch(patch: Patch): boolean { - const { path } = patch; - if (typeof path[0] !== 'string' || !GAME_SLICES.has(path[0])) return false; - if (path[0] === 'games' && IGNORED_GAME_PATHS.has(path[1] as string)) - return false; - return true; -} - -interface PatchBroadcastPayload { - senderId: string; - seq: number; - patches: Patch[]; -} - -interface FullStateRequestPayload { - senderId: string; -} - -interface FullStateResponsePayload { - senderId: string; - seq: number; - serialized: SerializedGame; - remoteData: Partial; -} - export function useRealtimeGameSync() { const session = useStore(s => s.auth.session); const selectedGameId = useStore(s => s.games.selected); @@ -81,182 +59,20 @@ export function useRealtimeGameSync() { const channel = supabaseClient.channel(channelName); const gameId = selectedGameId; const remoteSeqs = new Map(); - let dbFallbackTimer: ReturnType | null = null; - - function requestFullStateWithFallback() { - channel.send({ - type: 'broadcast', - event: BROADCAST_FULL_REQUEST, - payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload, - }); - - if (dbFallbackTimer !== null) clearTimeout(dbFallbackTimer); - dbFallbackTimer = setTimeout(async () => { - dbFallbackTimer = null; - logger.info('No peer response, reconciling with database'); - try { - const localGame = useStore.getState().games.games[gameId]; - const savedId = localGame?.savedId; - if (!savedId) return; - - const { data, error } = await supabaseClient - .from('games') - .select('updated_at') - .eq('id', savedId) - .single(); - - if (error) throw error; - - const dbTime = data?.updated_at - ? new Date(data.updated_at).getTime() - : 0; - const localTime = localGame.updatedAt - ? new Date(localGame.updatedAt).getTime() - : 0; - - if (dbTime > localTime) { - logger.info('DB is newer, loading remote state'); - isApplyingRemoteRef.current = true; - await loadRemoteGame(gameId, { override: true }); - isApplyingRemoteRef.current = false; - } else if (isLeaderRef.current) { - logger.info('Local is newer or equal, saving to DB (leader)'); - await saveRemoteGame(gameId, { silent: true }); - } else { - logger.info('Local is newer or equal, skipping save (not leader)'); - } - } catch (err) { - logger.error('DB fallback reconciliation failed', err); - } - }, DB_FALLBACK_MS); - } - - channel - .on('broadcast', { event: BROADCAST_EVENT }, ({ payload }) => { - const data = payload as PatchBroadcastPayload; - if (data.senderId === SENDER_ID) return; - - const lastSeq = remoteSeqs.get(data.senderId) ?? -1; - - if (data.seq <= lastSeq) { - logger.debug( - `Ignoring out-of-order patch from ${data.senderId} (seq=${data.seq}, expected>${lastSeq})`, - ); - return; - } - - if (lastSeq >= 0 && data.seq !== lastSeq + 1) { - logger.info( - `Missed patches from ${data.senderId} (got seq=${data.seq}, expected=${lastSeq + 1}), requesting full state`, - ); - requestFullStateWithFallback(); - remoteSeqs.set(data.senderId, data.seq); - return; - } - - remoteSeqs.set(data.senderId, data.seq); - logger.debug( - `Applying ${data.patches.length} remote patches (seq=${data.seq})`, - ); - isApplyingRemoteRef.current = true; - try { - const currentState = useStore.getState(); - const nextState = applyPatches(currentState, data.patches); - useStore.setState(nextState); - } catch (err) { - logger.error('Failed to apply patches, requesting full state', err); - requestFullStateWithFallback(); - } finally { - isApplyingRemoteRef.current = false; - } - }) - .on('broadcast', { event: BROADCAST_FULL_REQUEST }, ({ payload }) => { - const data = payload as FullStateRequestPayload; - if (data.senderId === SENDER_ID) return; - - logger.info('Peer requested full state, sending'); - try { - const latestGame = useStore.getState().games.games[gameId]; - if (!latestGame?.savedId) return; - - const serialized = serializeGame(gameId); - const remoteData: Partial = { - id: latestGame.savedId, - author_id: latestGame.authorId, - created_at: latestGame.createdAt, - updated_at: latestGame.updatedAt, - share_token: latestGame.shareToken, - }; - - channel.send({ - type: 'broadcast', - event: BROADCAST_FULL_RESPONSE, - payload: { - senderId: SENDER_ID, - seq: seqRef.current, - serialized, - remoteData, - } satisfies FullStateResponsePayload, - }); - } catch (err) { - logger.error('Failed to send full state response', err); - } - }) - .on('broadcast', { event: BROADCAST_FULL_RESPONSE }, ({ payload }) => { - const data = payload as FullStateResponsePayload; - if (data.senderId === SENDER_ID) return; - - if (dbFallbackTimer !== null) { - clearTimeout(dbFallbackTimer); - dbFallbackTimer = null; - } - - logger.info(`Received full state response (seq=${data.seq}), applying`); - remoteSeqs.set(data.senderId, data.seq); - isApplyingRemoteRef.current = true; - try { - useStore.getState().loadRemoteGame(data.serialized, data.remoteData, { - override: true, - }); - } finally { - isApplyingRemoteRef.current = false; - } - }) - .on('presence', { event: 'sync' }, () => { - const state = channel.presenceState<{ senderId: string }>(); - const senderIds: string[] = []; - for (const presences of Object.values(state)) { - for (const p of presences) { - if (p.senderId) senderIds.push(p.senderId); - } - } - senderIds.sort(); - const wasLeader = isLeaderRef.current; - isLeaderRef.current = senderIds[0] === SENDER_ID; - if (isLeaderRef.current !== wasLeader) { - logger.info( - isLeaderRef.current - ? `Elected as leader (${senderIds.length} peers)` - : `No longer leader (${senderIds.length} peers)`, - ); - } - }) - .subscribe(async status => { - logger.info(`Realtime channel status: ${status}`); - useStore.getState().setRealtimeSyncConnected(status === 'SUBSCRIBED'); - - if (status === 'SUBSCRIBED') { - await channel.track({ senderId: SENDER_ID }); - requestFullStateWithFallback(); - } - }); - - channelRef.current = channel; + const refs: SyncRefs = { + isApplyingRemote: isApplyingRemoteRef, + isLeader: isLeaderRef, + seq: seqRef, + }; + const timers: SyncTimers = { dbFallback: null }; let pendingPatches: Patch[] = []; let flushTimer: ReturnType | null = null; let autoSaveTimer: ReturnType | null = null; + const doRequestFullState = () => + requestFullStateWithFallback(channel, gameId, refs, timers); + function scheduleAutoSave() { if (!isLeaderRef.current) return; if (autoSaveTimer !== null) clearTimeout(autoSaveTimer); @@ -294,6 +110,46 @@ export function useRealtimeGameSync() { } } + channel + .on('broadcast', { event: BROADCAST_EVENT }, ({ payload }) => { + handleIncomingPatches( + payload as PatchBroadcastPayload, + remoteSeqs, + refs, + doRequestFullState, + ); + }) + .on('broadcast', { event: BROADCAST_FULL_REQUEST }, ({ payload }) => { + handleFullStateRequest( + payload as FullStateRequestPayload, + channel, + gameId, + refs, + ); + }) + .on('broadcast', { event: BROADCAST_FULL_RESPONSE }, ({ payload }) => { + handleFullStateResponse( + payload as FullStateResponsePayload, + remoteSeqs, + refs, + timers, + ); + }) + .on('presence', { event: 'sync' }, () => { + computeLeader(channel, refs); + }) + .subscribe(async status => { + logger.info(`Realtime channel status: ${status}`); + useStore.getState().setRealtimeSyncConnected(status === 'SUBSCRIBED'); + + if (status === 'SUBSCRIBED') { + await channel.track({ senderId: SENDER_ID }); + doRequestFullState(); + } + }); + + channelRef.current = channel; + const unsubscribePatches = onStorePatches(patches => { if (isApplyingRemoteRef.current) return; if (!channelRef.current) return; @@ -320,9 +176,9 @@ export function useRealtimeGameSync() { logger.error('Auto-save on cleanup failed', err), ); } - if (dbFallbackTimer !== null) { - clearTimeout(dbFallbackTimer); - dbFallbackTimer = null; + if (timers.dbFallback !== null) { + clearTimeout(timers.dbFallback); + timers.dbFallback = null; } if (channelRef.current) { From f28037237dd244539ebde4a235cc042278959e1a Mon Sep 17 00:00:00 2001 From: Clark Tomlinson Date: Wed, 15 Apr 2026 13:48:47 -0400 Subject: [PATCH 5/5] bad formatting :doh: --- src/games/sync/realtimeSyncHandlers.ts | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/games/sync/realtimeSyncHandlers.ts b/src/games/sync/realtimeSyncHandlers.ts index 04deda7f..1bc2c18b 100644 --- a/src/games/sync/realtimeSyncHandlers.ts +++ b/src/games/sync/realtimeSyncHandlers.ts @@ -1,5 +1,5 @@ import type { RealtimeChannel } from '@supabase/supabase-js'; -import { applyPatches, type Patch } from 'immer'; +import { applyPatches } from 'immer'; import { loglev } from '@/core/logger/log'; import { supabaseClient } from '@/core/supabase'; import { useStore } from '@/core/zustand'; @@ -163,9 +163,7 @@ export function requestFullStateWithFallback( if (error) throw error; - const dbTime = data?.updated_at - ? new Date(data.updated_at).getTime() - : 0; + const dbTime = data?.updated_at ? new Date(data.updated_at).getTime() : 0; const localTime = localGame.updatedAt ? new Date(localGame.updatedAt).getTime() : 0; @@ -187,10 +185,7 @@ export function requestFullStateWithFallback( }, DB_FALLBACK_MS); } -export function computeLeader( - channel: RealtimeChannel, - refs: SyncRefs, -) { +export function computeLeader(channel: RealtimeChannel, refs: SyncRefs) { const state = channel.presenceState<{ senderId: string }>(); const senderIds: string[] = []; for (const presences of Object.values(state)) {