Skip to content

Commit a7ef86d

Browse files
committed
fix: realtime safeguards and logging
1 parent d5049f4 commit a7ef86d

4 files changed

Lines changed: 128 additions & 29 deletions

File tree

src/games/sync/peersSlice.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import type { RealtimeChannel } from '@supabase/supabase-js';
12
import { useMemo } from 'react';
23
import { useStore } from '@/core/zustand';
34
import { createSlice } from '@/core/zustand-helpers/slices';
4-
import { SENDER_ID } from './realtimeSyncTypes';
5+
import { type PresencePayload, SENDER_ID } from './realtimeSyncTypes';
56

67
export interface PeerInfo {
78
senderId: string;
@@ -53,6 +54,22 @@ export function countOtherPeers(peers: Record<string, PeerInfo>): number {
5354
return count;
5455
}
5556

56-
export function hasOtherPeersConnected(): boolean {
57-
return countOtherPeers(useStore.getState().peers.peers) > 0;
57+
/**
58+
* Reads peers directly from the realtime channel's presenceState instead of
59+
* the zustand slice. The slice is updated only on `presence.sync` events
60+
* (handled by `computeLeaderAndPeers`), so it lags behind the channel by up
61+
* to a few hundred ms after subscribe — long enough that the first patches
62+
* after joining could be wrongly skipped as "no peers". The channel itself
63+
* always knows the current presence list.
64+
*/
65+
export function hasOtherPeersConnectedOnChannel(
66+
channel: RealtimeChannel,
67+
): boolean {
68+
const state = channel.presenceState<PresencePayload>();
69+
for (const presences of Object.values(state)) {
70+
for (const p of presences) {
71+
if (p.senderId && p.senderId !== SENDER_ID) return true;
72+
}
73+
}
74+
return false;
5875
}

src/games/sync/realtimeSyncHandlers.ts

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
SENDER_ID,
1919
withSuppressedBroadcast,
2020
} from './realtimeSyncTypes';
21+
import { safeChannelSend } from './safeChannelSend';
2122

2223
const logger = loglev.getLogger('games:realtime-sync');
2324

@@ -99,15 +100,19 @@ export function handleFullStateRequest(
99100
share_token: latestGame.shareToken,
100101
};
101102

102-
channel.send({
103-
type: 'broadcast',
104-
event: BROADCAST_FULL_RESPONSE,
105-
payload: {
106-
senderId: SENDER_ID,
107-
seq: refs.seq.current,
108-
serialized,
109-
remoteData,
110-
} satisfies FullStateResponsePayload,
103+
safeChannelSend({
104+
channel,
105+
message: {
106+
type: 'broadcast',
107+
event: BROADCAST_FULL_RESPONSE,
108+
payload: {
109+
senderId: SENDER_ID,
110+
seq: refs.seq.current,
111+
serialized,
112+
remoteData,
113+
} satisfies FullStateResponsePayload,
114+
},
115+
context: 'full state response',
111116
});
112117
} catch (err) {
113118
logger.error('Failed to send full state response', err);
@@ -145,10 +150,14 @@ export function requestFullStateWithFallback(
145150
refs: SyncRefs,
146151
timers: SyncTimers,
147152
) {
148-
channel.send({
149-
type: 'broadcast',
150-
event: BROADCAST_FULL_REQUEST,
151-
payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload,
153+
safeChannelSend({
154+
channel,
155+
message: {
156+
type: 'broadcast',
157+
event: BROADCAST_FULL_REQUEST,
158+
payload: { senderId: SENDER_ID } satisfies FullStateRequestPayload,
159+
},
160+
context: 'full state request',
152161
});
153162

154163
if (timers.dbFallback !== null) clearTimeout(timers.dbFallback);

src/games/sync/safeChannelSend.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import type { RealtimeChannel } from '@supabase/supabase-js';
2+
import { loglev } from '@/core/logger/log';
3+
4+
const logger = loglev.getLogger('games:realtime-sync');
5+
6+
/**
7+
* Wraps `channel.send` to log non-ok responses and rejections. The Supabase
8+
* client returns a Promise<'ok' | 'timed out' | 'error'> that we otherwise
9+
* ignore — failures (e.g. RLS denies, channel closed, payload too big) would
10+
* be silent and very hard to diagnose at runtime.
11+
*/
12+
export interface SafeChannelSendArgs {
13+
channel: RealtimeChannel;
14+
message: Parameters<RealtimeChannel['send']>[0];
15+
context: string;
16+
}
17+
18+
export function safeChannelSend({
19+
channel,
20+
message,
21+
context,
22+
}: SafeChannelSendArgs): void {
23+
channel
24+
.send(message)
25+
.then(status => {
26+
if (status !== 'ok') {
27+
logger.warn(`channel.send (${context}) returned: ${status}`);
28+
}
29+
})
30+
.catch(err => {
31+
logger.error(`channel.send (${context}) failed`, err);
32+
});
33+
}

src/games/sync/useRealtimeGameSync.ts

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import type { RealtimeChannel } from '@supabase/supabase-js';
22
import type { Patch } from 'immer';
3-
import { useEffect, useRef } from 'react';
3+
import { useEffect, useRef, useState } from 'react';
44
import { loglev } from '@/core/logger/log';
55
import { supabaseClient } from '@/core/supabase';
66
import { useStore } from '@/core/zustand';
77
import { onStorePatches } from '@/core/zustand-helpers/immer';
88
import { saveRemoteGame } from '@/games/save/saveRemoteGame';
99
import { useCurrentFactoryId } from '@/notes/useNotesContext';
1010
import { flushRemoteGameOnUnload } from './flushRemoteGameOnUnload';
11-
import { hasOtherPeersConnected } from './peersSlice';
11+
import { hasOtherPeersConnectedOnChannel } from './peersSlice';
1212
import {
1313
computeLeaderAndPeers,
1414
handleFullStateRequest,
@@ -32,6 +32,7 @@ import {
3232
type PresencePayload,
3333
SENDER_ID,
3434
} from './realtimeSyncTypes';
35+
import { safeChannelSend } from './safeChannelSend';
3536

3637
const logger = loglev.getLogger('games:realtime-sync');
3738

@@ -50,6 +51,10 @@ export function useRealtimeGameSync() {
5051
const isLeaderRef = useRef(false);
5152
const factoryIdRef = useRef(factoryId);
5253
factoryIdRef.current = factoryId;
54+
// Bumped to force the subscribe effect to re-run and recreate the channel
55+
// after CHANNEL_ERROR / CLOSED / TIMED_OUT. Reset on successful SUBSCRIBED.
56+
const [reconnectEpoch, setReconnectEpoch] = useState(0);
57+
const reconnectAttemptsRef = useRef(0);
5358

5459
useEffect(() => {
5560
if (!channelRef.current || !session) return;
@@ -76,7 +81,9 @@ export function useRealtimeGameSync() {
7681
}
7782

7883
const channelName = `game:${savedId}`;
79-
logger.info(`Joining realtime channel: ${channelName}`);
84+
logger.info(
85+
`Joining realtime channel: ${channelName} (epoch=${reconnectEpoch})`,
86+
);
8087

8188
const channel = supabaseClient.channel(channelName, {
8289
config: { private: true },
@@ -93,7 +100,29 @@ export function useRealtimeGameSync() {
93100
let pendingPatches: Patch[] = [];
94101
let flushTimer: ReturnType<typeof setTimeout> | null = null;
95102
let autoSaveTimer: ReturnType<typeof setTimeout> | null = null;
103+
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
96104
let hasDirtySinceLastSave = false;
105+
// Set to true in the effect cleanup so the subscribe callback (which fires
106+
// with CLOSED when we voluntarily remove the channel) does not schedule a
107+
// reconnect loop. Only genuine errors while the effect is still active
108+
// should trigger a retry.
109+
let isCleaningUp = false;
110+
111+
function scheduleReconnect() {
112+
if (isCleaningUp) return;
113+
if (reconnectTimer !== null) return;
114+
const attempt = reconnectAttemptsRef.current;
115+
// Exponential backoff capped at 30s: 1s, 2s, 4s, 8s, 16s, 30s, 30s, ...
116+
const delay = Math.min(1000 * 2 ** attempt, 30_000);
117+
reconnectAttemptsRef.current = attempt + 1;
118+
logger.warn(
119+
`Scheduling realtime reconnect in ${delay}ms (attempt=${attempt + 1})`,
120+
);
121+
reconnectTimer = setTimeout(() => {
122+
reconnectTimer = null;
123+
setReconnectEpoch(e => e + 1);
124+
}, delay);
125+
}
97126

98127
const doRequestFullState = () =>
99128
requestFullStateWithFallback(channel, gameId, refs, timers);
@@ -117,9 +146,8 @@ export function useRealtimeGameSync() {
117146
flushTimer = null;
118147
if (!channelRef.current || pendingPatches.length === 0) return;
119148

120-
if (!hasOtherPeersConnected()) {
149+
if (!hasOtherPeersConnectedOnChannel(channelRef.current)) {
121150
pendingPatches = [];
122-
logger.debug('Skipping broadcast: no other peers connected');
123151
return;
124152
}
125153

@@ -128,20 +156,20 @@ export function useRealtimeGameSync() {
128156
const batch = pendingPatches;
129157
pendingPatches = [];
130158

131-
try {
132-
channelRef.current.send({
159+
safeChannelSend({
160+
channel: channelRef.current,
161+
message: {
133162
type: 'broadcast',
134163
event: BROADCAST_EVENT,
135164
payload: {
136165
senderId: SENDER_ID,
137166
seq,
138167
patches: batch,
139168
} satisfies PatchBroadcastPayload,
140-
});
141-
logger.debug(`Broadcasted ${batch.length} patches (seq=${seq})`);
142-
} catch (err) {
143-
logger.error('Failed to broadcast patches', err);
144-
}
169+
},
170+
context: `patch batch seq=${seq}`,
171+
});
172+
logger.debug(`Broadcasted ${batch.length} patches (seq=${seq})`);
145173
}
146174

147175
channel
@@ -177,6 +205,7 @@ export function useRealtimeGameSync() {
177205
useStore.getState().setRealtimeSyncConnected(status === 'SUBSCRIBED');
178206

179207
if (status === 'SUBSCRIBED') {
208+
reconnectAttemptsRef.current = 0;
180209
const user = session.user;
181210
await channel.track({
182211
senderId: SENDER_ID,
@@ -189,6 +218,12 @@ export function useRealtimeGameSync() {
189218
factoryId: factoryIdRef.current,
190219
} satisfies PresencePayload);
191220
doRequestFullState();
221+
} else if (
222+
status === 'CHANNEL_ERROR' ||
223+
status === 'CLOSED' ||
224+
status === 'TIMED_OUT'
225+
) {
226+
scheduleReconnect();
192227
}
193228
});
194229

@@ -222,6 +257,7 @@ export function useRealtimeGameSync() {
222257
});
223258

224259
return () => {
260+
isCleaningUp = true;
225261
window.removeEventListener('beforeunload', onBeforeUnload);
226262
unsubscribePatches();
227263
if (flushTimer !== null) {
@@ -245,6 +281,10 @@ export function useRealtimeGameSync() {
245281
clearTimeout(timers.dbFallback);
246282
timers.dbFallback = null;
247283
}
284+
if (reconnectTimer !== null) {
285+
clearTimeout(reconnectTimer);
286+
reconnectTimer = null;
287+
}
248288

249289
if (channelRef.current) {
250290
logger.info(`Leaving realtime channel: ${channelName}`);
@@ -255,5 +295,5 @@ export function useRealtimeGameSync() {
255295
useStore.getState().setRealtimeSyncConnected(false);
256296
useStore.getState().clearPeers();
257297
};
258-
}, [session, savedId, selectedGameId]);
298+
}, [session, savedId, selectedGameId, reconnectEpoch]);
259299
}

0 commit comments

Comments
 (0)