Skip to content

Commit 08c3551

Browse files
authored
Fix race causing duplicate subscribe requests (#303)
1 parent 032ef72 commit 08c3551

3 files changed

Lines changed: 129 additions & 10 deletions

File tree

docker-compose.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ version: '3.8'
22

33
services:
44
centrifugo:
5-
image: centrifugo/centrifugo:v6.0.0
5+
image: centrifugo/centrifugo:v6
66
command:
77
- centrifugo
88
ports:
@@ -13,4 +13,5 @@ services:
1313
- CENTRIFUGO_SSE_ENABLED=true
1414
- CENTRIFUGO_CHANNEL_WITHOUT_NAMESPACE_PRESENCE=true
1515
- CENTRIFUGO_CLIENT_CONCURRENCY=8
16-
- CENTRIFUGO_LOG_LEVEL=debug
16+
- CENTRIFUGO_LOG_LEVEL=trace
17+
- CENTRIFUGO_CLIENT_TOKEN_HMAC_SECRET_KEY=secret

src/centrifuge.test.ts

Lines changed: 122 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -368,16 +368,40 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport
368368

369369
sub.subscribe()
370370
const presenceStats = await sub.presenceStats();
371-
expect(presenceStats.numClients).toBe(1)
371+
expect(presenceStats.numClients).toBe(1);
372372
expect(presenceStats.numUsers).toBe(1);
373373
const presence = await sub.presence();
374374
expect(Object.keys(presence.clients).length).toBe(1)
375375
await sub.unsubscribe()
376-
const presenceStats2 = await c.presenceStats('test');
377-
expect(presenceStats2.numClients).toBe(0)
376+
377+
const retryWithDelay = async (fn, validate, maxRetries, delay) => {
378+
for (let i = 0; i < maxRetries; i++) {
379+
const result = await fn();
380+
if (validate(result)) {
381+
return result;
382+
}
383+
await new Promise(resolve => setTimeout(resolve, delay));
384+
}
385+
throw new Error("Validation failed after retries");
386+
};
387+
388+
const presenceStats2 = await retryWithDelay(
389+
() => c.presenceStats('test'),
390+
(stats: any) => stats.numClients === 0 && stats.numUsers === 0,
391+
3,
392+
2000
393+
);
394+
395+
const presence2 = await retryWithDelay(
396+
() => c.presence('test'),
397+
(presence: any) => Object.keys(presence.clients).length === 0,
398+
3,
399+
2000
400+
);
401+
402+
expect(presenceStats2.numClients).toBe(0);
378403
expect(presenceStats2.numUsers).toBe(0);
379-
const presence2 = await c.presence('test');
380-
expect(Object.keys(presence2.clients).length).toBe(0)
404+
expect(Object.keys(presence2.clients).length).toBe(0);
381405

382406
let disconnectCalled: any;
383407
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
@@ -552,3 +576,96 @@ test.each(websocketOnly)("%s: reconnect after close before transport open", asyn
552576
await disconnectedPromise;
553577
expect(c.state).toBe(State.Disconnected);
554578
});
579+
580+
test.each(transportCases)("%s: subscribes and unsubscribes from many subs", async (transport, endpoint) => {
581+
const c = new Centrifuge([{
582+
transport: transport as TransportName,
583+
endpoint: endpoint,
584+
}], {
585+
websocket: WebSocket,
586+
fetch: fetch,
587+
eventsource: EventSource,
588+
readableStream: ReadableStream,
589+
emulationEndpoint: 'http://localhost:8000/emulation',
590+
// debug: true
591+
});
592+
// Keep an array of promises so that we can wait for each subscription's 'unsubscribed' event.
593+
const unsubscribedPromises: Promise<UnsubscribedContext>[] = [];
594+
595+
const channels = [
596+
'test1',
597+
'test2',
598+
'test3',
599+
'test4',
600+
'test5',
601+
];
602+
603+
// Subscription tokens for anonymous users without ttl. Using an HMAC secret key used in tests ("secret").
604+
const testTokens = {
605+
'test1': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzNDgsImNoYW5uZWwiOiJ0ZXN0MSJ9.eqPQxbBtyYxL8Hvbkm-P6aH7chUsSG_EMWe-rTwF_HI",
606+
'test2': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzODcsImNoYW5uZWwiOiJ0ZXN0MiJ9.tTJB3uSa8XpEmCvfkmrSKclijofnJ5RkQk6L2SaGtUE",
607+
'test3': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzOTgsImNoYW5uZWwiOiJ0ZXN0MyJ9.nyLcMrIot441CszOKska7kQIjo2sEm8pSxV1XWfNCsI",
608+
'test4': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzI0MDksImNoYW5uZWwiOiJ0ZXN0NCJ9.wWAX2AhJX6Ep4HVexQWSVF3-cWytVhzY9Pm7QsMdCsI",
609+
'test5': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzI0MTgsImNoYW5uZWwiOiJ0ZXN0NSJ9.hCSfpHYws5TXLKkN0bW0DU6C-wgEUNuhGaIy8W1sT9o"
610+
}
611+
612+
c.connect();
613+
614+
const subscriptions: any[] = [];
615+
616+
for (const channel of channels) {
617+
const sub = c.newSubscription(channel, {
618+
getToken: async function () {
619+
// Sleep for a random time between 0 and 100 milliseconds to emulate network.
620+
const sleep = (ms: any) => new Promise(resolve => setTimeout(resolve, ms));
621+
await sleep(Math.random() * 100);
622+
return testTokens[channel];
623+
}
624+
});
625+
626+
// Create a promise for the 'unsubscribed' event of this subscription.
627+
const unsubPromise = new Promise<UnsubscribedContext>((resolve) => {
628+
sub.on("unsubscribed", (ctx) => {
629+
resolve(ctx);
630+
});
631+
});
632+
unsubscribedPromises.push(unsubPromise);
633+
634+
// Actually subscribe.
635+
sub.subscribe();
636+
subscriptions.push(sub);
637+
}
638+
639+
// Wait until all subscriptions are in the Subscribed state.
640+
await Promise.all(
641+
subscriptions.map(async (sub) => {
642+
await sub.ready(5000);
643+
expect(sub.state).toBe(SubscriptionState.Subscribed);
644+
})
645+
);
646+
647+
// The client itself should be connected now.
648+
expect(c.state).toBe(State.Connected);
649+
650+
// Unsubscribe from all and then disconnect.
651+
subscriptions.forEach((sub) => {
652+
sub.unsubscribe();
653+
});
654+
c.disconnect();
655+
656+
// Wait until all 'unsubscribed' events are received.
657+
const unsubscribedContexts = await Promise.all(unsubscribedPromises);
658+
659+
// Confirm each subscription is now Unsubscribed.
660+
subscriptions.forEach((sub) => {
661+
expect(sub.state).toBe(SubscriptionState.Unsubscribed);
662+
});
663+
664+
// The client should be disconnected.
665+
expect(c.state).toBe(State.Disconnected);
666+
667+
// Assert the correct unsubscribe code for each subscription.
668+
unsubscribedContexts.forEach((ctx) => {
669+
expect(ctx.code).toBe(unsubscribedCodes.unsubscribeCalled);
670+
});
671+
});

src/subscription.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,11 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
355355
// we also need to check for transport state before sending subscription
356356
// because it may change for subscription with side effects (getData, getToken options)
357357
// @ts-ignore – we are hiding some symbols from public API autocompletion.
358-
if (!this._centrifuge._transportIsOpen) {
358+
if (!this._centrifuge._transportIsOpen || this._inflight) {
359359
return null;
360360
}
361+
this._inflight = true;
362+
361363
const channel = this.channel;
362364

363365
const req: any = {
@@ -402,8 +404,6 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
402404

403405
const cmd = { subscribe: req };
404406

405-
this._inflight = true;
406-
407407
// @ts-ignore – we are hiding some symbols from public API autocompletion.
408408
this._centrifuge._call(cmd).then(resolveCtx => {
409409
this._inflight = false;
@@ -464,6 +464,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
464464
}
465465
this._clearSubscribingState();
466466
}
467+
this._inflight = false;
467468
if (this._setState(SubscriptionState.Unsubscribed)) {
468469
this.emit('unsubscribed', { channel: this.channel, code: code, reason: reason });
469470
}

0 commit comments

Comments
 (0)