Skip to content

Commit 405c874

Browse files
authored
Subscription types: Map subscriptions and Shared Poll subscriptions #354
2 parents 582c77f + 2da0e5c commit 405c874

23 files changed

Lines changed: 8096 additions & 180 deletions

NEXT_MAJOR_SUBSCRIPTION_TYPES.md

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
# Next Major: Clean Subscription Types
2+
3+
This document describes changes to make when the next major version removes backwards-compatibility constraints around subscription types.
4+
5+
## Status
6+
7+
A subset of this plan has already landed in v5 because the map/shared-poll APIs are not yet released and the user authorized breaking changes there:
8+
9+
-`publish(data)` and `history(opts)` moved from `BaseSubscription` into `Subscription` class (still named `Subscription`, not yet `StreamSubscription`).
10+
-`mapPublish` / `mapRemove` deleted from `BaseSubscription`; their logic is inlined into `MapSubscription.publish(key, data)` / `MapSubscription.remove(key)`.
11+
-`MapSubscription.publish` now takes `data: any` (required) on both the impl and the public type — the `data?` override hack is gone.
12+
-`track`, `untrack`, `trackedKeys` moved from `BaseSubscription` to `SharedPollSubscription`; the `if (!this._sharedPoll) throw` runtime guards are gone.
13+
- ✅ Required protected members promoted: `_centrifuge`, `_methodCall`, `_debounceMs`, `_debouncedPublish`, `_cancelDebounce`, `_isSubscribed`, `_sharedPollTrackedItems`, `_sharedPollGetSignature`, `_sharedPollPendingSignature`, `_sharedPollPendingItems`, `_sendTrackRequest`, `_sendUntrackRequest`, `_handleTrackError`.
14+
15+
The remaining items below are still future work.
16+
17+
## Current state (v5, post-shipped-cleanup)
18+
19+
Class hierarchy exists but is partially hidden behind type aliases:
20+
21+
- `BaseSubscription` (class) — lifecycle + shared internal helpers (including the protected `_sendTrackRequest` / `_sendUntrackRequest` / `_handleTrackError` and the shared-poll lifecycle handlers `_sharedPollReplayTrack` / `_sharedPollRefreshSignature` invoked from the subscribe flow). Uses `InternalSubscriptionEvents` (broad event type for internal emit calls). No longer hosts `publish(data)`, `history(opts)`, `mapPublish`, `mapRemove`, `track`, `untrack`, or `trackedKeys`.
22+
- `Subscription extends BaseSubscription` — defines `publish(data)` and `history(opts)` for stream channels.
23+
- `MapSubscription extends BaseSubscription` — defines `publish(key, data)` and `remove(key)`, no override conflict.
24+
- `SharedPollSubscription extends BaseSubscription` — defines `track`, `untrack`, `trackedKeys` directly; no runtime guards.
25+
26+
Exported types in `centrifuge.ts` are still manually constructed (`CommonSurface` + explicit type members) to narrow the public surface. Internal classes are imported with `_` prefix to avoid name collisions with the type aliases. Factory methods still use `as unknown as` to bridge between internal classes and exported types.
27+
28+
Event types:
29+
- `SubscriptionEvents` — clean stream events (no `sync`/`update`)
30+
- `InternalSubscriptionEvents` — extends `SubscriptionEvents` with `sync`/`update`, used by `BaseSubscription` class internally so subclasses can emit map/shared-poll events
31+
- `MapSubscriptionEvents``SubscriptionEvents` + narrowed `sync`/`update` with `MapEntry` / `MapUpdateContext`
32+
- `SharedPollSubscriptionEvents``SubscriptionEvents` + narrowed `update` with `SharedPollUpdateContext`
33+
- `BaseSubscriptionEvents` — alias for `SubscriptionEvents` (shared by all types)
34+
35+
Remaining known compromise:
36+
- `InternalSubscriptionEvents` still exists because `BaseSubscription` lifecycle code emits `sync` and `update` from publication handlers and from `_sharedPollReplayTrack` / `_sharedPollRefreshSignature` (synthetic `removed:true` events on key revocation). Eliminating it requires moving those emit sites into the subclasses, which is part of the domino chain (state-field migration → event-type narrowing → drop type aliases).
37+
38+
## Changes for next major
39+
40+
### 1. Rename `Subscription``StreamSubscription`, export classes directly
41+
42+
Rename the class and drop the type alias indirection. Remove the `CommonSurface` type, the `_`-prefixed imports, and all `as unknown as` casts. Export the actual classes:
43+
44+
```typescript
45+
// subscription.ts
46+
export class BaseSubscription extends ... { ... }
47+
export class StreamSubscription extends BaseSubscription { ... }
48+
export class MapSubscription extends BaseSubscription { ... }
49+
export class SharedPollSubscription extends BaseSubscription { ... }
50+
```
51+
52+
```typescript
53+
// centrifuge.ts
54+
import { BaseSubscription, StreamSubscription, MapSubscription, SharedPollSubscription } from './subscription';
55+
56+
// No type aliases needed — the classes ARE the types
57+
```
58+
59+
```typescript
60+
// index.ts
61+
export { StreamSubscription, MapSubscription, SharedPollSubscription } from './subscription';
62+
// BaseSubscription exported only if users need it for mixed-type registries
63+
export { BaseSubscription } from './subscription';
64+
```
65+
66+
Centrifuge client methods rename accordingly:
67+
- `newSubscription()``newStreamSubscription()`
68+
- `getSubscription()` returns `AnySubscription | null` (see step 4)
69+
- `removeSubscription()` accepts `AnySubscription | null`
70+
71+
### 2. Move `publish(data)` and `history()` from BaseSubscription to StreamSubscription
72+
73+
Already done — both `publish(data)` and `history(opts)` live on the `Subscription` class (which will be renamed `StreamSubscription`). `BaseSubscription` no longer hosts either method, so map and shared-poll subscriptions don't inherit them. The override conflict and `data?` optional hack are gone.
74+
75+
### 3. Move map/shared-poll methods from BaseSubscription to subclasses
76+
77+
Already done. Both sides are now in their respective subclasses with no runtime guards:
78+
79+
```typescript
80+
export class MapSubscription extends BaseSubscription {
81+
async publish(key: string, data: any): Promise<PublishResult> { ... }
82+
async remove(key: string): Promise<PublishResult> { ... }
83+
}
84+
85+
export class SharedPollSubscription extends BaseSubscription {
86+
track(keysOrItems: string[] | SharedPollTrackItem[], signature?: string): void { ... }
87+
untrack(keys: string[]): void { ... }
88+
trackedKeys(): Set<string> { ... }
89+
}
90+
```
91+
92+
Calling `track()` on a `MapSubscription` (or vice versa) is now a compile-time error rather than a runtime throw.
93+
94+
### 4. Unify get/remove/subscriptions methods with discriminated union
95+
96+
Replace the separate typed getters with union returns:
97+
98+
```typescript
99+
type AnySubscription = StreamSubscription | MapSubscription | SharedPollSubscription;
100+
101+
getSubscription(channel: string): AnySubscription | null;
102+
removeSubscription(sub: AnySubscription | null): void;
103+
subscriptions(): Record<string, AnySubscription>;
104+
```
105+
106+
Remove `getMapSubscription`, `removeMapSubscription`, `getSharedPollSubscription`, `removeSharedPollSubscription`, `mapSubscriptions`, `sharedPollSubscriptions`. Users narrow with the `type` discriminant:
107+
108+
```typescript
109+
const sub = client.getSubscription('my-channel');
110+
if (sub?.type === 'map') {
111+
sub.publish('key', data); // TypeScript knows it's MapSubscription
112+
}
113+
```
114+
115+
This requires making `type` a string literal on each class (currently it's `string`):
116+
117+
```typescript
118+
class StreamSubscription extends BaseSubscription {
119+
readonly type = 'stream' as const;
120+
}
121+
class MapSubscription extends BaseSubscription {
122+
readonly type = 'map' as const;
123+
}
124+
class SharedPollSubscription extends BaseSubscription {
125+
readonly type = 'shared_poll' as const;
126+
}
127+
```
128+
129+
### 5. Eliminate InternalSubscriptionEvents
130+
131+
Currently `BaseSubscription` uses `InternalSubscriptionEvents` (the superset with `sync`/`update`) because the emit calls for map and shared-poll events live in `BaseSubscription` methods. After step 3 moves logic to subclasses, each class can use its own event map:
132+
133+
- `BaseSubscription``BaseSubscriptionEvents` (no `sync`/`update`)
134+
- `StreamSubscription``StreamSubscriptionEvents` (rename from `SubscriptionEvents`)
135+
- `MapSubscription``MapSubscriptionEvents`
136+
- `SharedPollSubscription``SharedPollSubscriptionEvents`
137+
138+
TypeScript doesn't allow changing the EventEmitter generic on `extends`. Approaches:
139+
- Have `BaseSubscription` use a broad internal type and subclasses use `declare` to narrow
140+
- Or make each subclass extend `EventEmitter` directly (losing shared constructor code)
141+
- Or keep `InternalSubscriptionEvents` on the base class but never export it — subclasses just get narrower public types via their own event maps. This is what we have now and may be acceptable even in the next major.
142+
143+
### 6. Clean up internal state fields
144+
145+
`BaseSubscription` currently carries all state for all subscription types (`_map`, `_mapPhase`, `_sharedPoll`, `_sharedPollTrackedItems`, etc.). Move these to the subclasses that use them. This is a larger refactor since much of the logic in `BaseSubscription` references these fields directly — it may require extracting handler methods that subclasses override.
146+
147+
### 7. Remove deprecated method aliases
148+
149+
Already done. `mapPublish` / `mapRemove` no longer exist on `BaseSubscription`. `track`/`untrack`/`trackedKeys` are no longer on `BaseSubscription` — they live on `SharedPollSubscription`.
150+
151+
## Migration guide for users
152+
153+
| v5 | Next major |
154+
|---|---|
155+
| `import { Subscription }` | `import { StreamSubscription }` |
156+
| `client.newSubscription(ch)` | `client.newStreamSubscription(ch)` |
157+
| `client.getMapSubscription(ch)` | `client.getSubscription(ch)` + narrow with `sub.type === 'map'` |
158+
| `client.getSharedPollSubscription(ch)` | `client.getSubscription(ch)` + narrow with `sub.type === 'shared_poll'` |
159+
| `client.mapSubscriptions()` | Filter `client.subscriptions()` by `sub.type` |
160+
| `client.sharedPollSubscriptions()` | Filter `client.subscriptions()` by `sub.type` |
161+
| `client.removeMapSubscription(sub)` | `client.removeSubscription(sub)` |
162+
| `client.removeSharedPollSubscription(sub)` | `client.removeSubscription(sub)` |
163+
| `import type { MapSubscription }` | `import { MapSubscription }` — it's a class now, not just a type |
164+
| `import type { SharedPollSubscription }` | `import { SharedPollSubscription }` — it's a class now, not just a type |
165+
| `SubscriptionEvents` | `StreamSubscriptionEvents` |
166+
167+
## Lessons from validation
168+
169+
The full refactor was applied and validated (tsc clean, 135 tests pass). Key findings:
170+
171+
### `private``protected` is required for moving methods to subclasses
172+
173+
All of the BaseSubscription members needed by the migrated subclass methods are now `protected`:
174+
- `_centrifuge` — subclasses delegate to `this._centrifuge.publish(...)`, `this._centrifuge.mapPublish(...)`, etc.
175+
- `_methodCall()` — gate for all RPC methods (waits for subscribed state).
176+
- `_debounceMs`, `_debouncedPublish()`, `_cancelDebounce()` — used by both `Subscription.publish` and `MapSubscription.publish`/`remove`.
177+
- `_isSubscribed()` — used by `track`/`untrack` to decide whether to send immediately or buffer.
178+
- `_sharedPollTrackedItems`, `_sharedPollGetSignature`, `_sharedPollPendingSignature`, `_sharedPollPendingItems` — accessed by `SharedPollSubscription.track`/`untrack`.
179+
- `_sendTrackRequest()`, `_sendUntrackRequest()`, `_handleTrackError()` — called by `SharedPollSubscription.track`/`untrack`.
180+
181+
### Steps 6, 5, and "export classes directly" are one domino chain
182+
183+
They cannot be done independently:
184+
1. **Move state fields to subclasses (step 6)** — the real work. Internal logic (subscribe flow, recovery, publication handling) heavily references `_map*` and `_sharedPoll*` fields. Requires extracting handler methods that subclasses override.
185+
2. **Eliminate InternalSubscriptionEvents (step 5)** — blocked by step 6. The `emit('sync', ...)` and `emit('update', ...)` calls live in BaseSubscription methods that process publications. Until that logic moves to subclasses, the base class needs the broad event type.
186+
3. **Export classes directly / remove type aliases (step 1 partial)** — blocked by step 5. Type aliases exist because the class EventEmitter type is `InternalSubscriptionEvents` (too broad). Until each subclass has its own narrowed EventEmitter, the type aliases provide the narrowing.
187+
188+
Everything else in the checklist is independent and works cleanly.
189+
190+
### `removeSubscription` with union param needs internal cast
191+
192+
When `removeSubscription(sub: AnySubscription | null)` replaces separate typed remove methods, the body needs to cast `sub as unknown as _BaseSubscription` before calling internal `_removeSubscription`, since `AnySubscription` is a type alias (not the class).
193+
194+
## Checklist
195+
196+
- [ ] Rename `Subscription``StreamSubscription` (no deprecated alias — clean break)
197+
- [ ] Rename `newSubscription()``newStreamSubscription()` (no deprecated alias)
198+
- [ ] Rename `SubscriptionEvents``StreamSubscriptionEvents`
199+
- [x] Move `publish(data)` and `history(opts)` to `StreamSubscription` class _(done in v5; both live on `Subscription` pending the rename)_
200+
- [x] Move `mapPublish`/`mapRemove` logic into `MapSubscription.publish`/`remove`
201+
- [x] Move `track`/`untrack`/`trackedKeys` to `SharedPollSubscription`
202+
- [x] Promote all required `private``protected` members (`_centrifuge`, `_methodCall`, `_debounceMs`, `_debouncedPublish`, `_cancelDebounce`, `_isSubscribed`, `_sharedPollTrackedItems`, `_sharedPollGetSignature`, `_sharedPollPendingSignature`, `_sharedPollPendingItems`, `_sendTrackRequest`, `_sendUntrackRequest`, `_handleTrackError`)
203+
- [ ] Make `type` a const literal on each class for discriminated union narrowing
204+
- [ ] Unify `getSubscription`/`removeSubscription`/`subscriptions` return to `AnySubscription` discriminated union
205+
- [ ] Remove typed getters (`getMapSubscription`, `removeMapSubscription`, `getSharedPollSubscription`, `removeSharedPollSubscription`, `mapSubscriptions`, `sharedPollSubscriptions`)
206+
- [x] Remove `mapPublish`/`mapRemove` method aliases from class
207+
- [ ] Export `StreamSubscription`, `MapSubscription`, `SharedPollSubscription` classes from `index.ts`
208+
- [ ] Update tests
209+
- [ ] Update CHANGELOG with migration notes
210+
- [ ] **(Domino chain — do together, or defer):**
211+
- [ ] Move subscription-type-specific state fields and logic to subclasses (`_map*`, `_sharedPoll*`)
212+
- [ ] Eliminate `InternalSubscriptionEvents` — each subclass gets own EventEmitter type
213+
- [ ] Remove `CommonSurface` type, `_`-prefixed imports, `as unknown as` casts — export classes directly

README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ The features implemented by this SDK can be found in [SDK feature matrix](https:
2323
* [Subscription token](#subscription-token)
2424
* [Subscription options](#subscription-options)
2525
* [Subscription management API](#subscription-management-api)
26+
* [Stream subscriptions with `getState`](#stream-subscriptions-with-getstate)
27+
* [Map subscriptions (experimental)](#map-subscriptions-experimental)
28+
* [Shared poll subscriptions (experimental)](#shared-poll-subscriptions-experimental)
2629
* [Message batching](#message-batching)
2730
* [Server-side subscriptions](#server-side-subscriptions)
2831
* [Protobuf support](#protobuf-support)
@@ -850,6 +853,56 @@ removeSubscription allows removing Subcription from the internal registry.
850853

851854
Get a map with all current client-side subscriptions registered in the client.
852855

856+
## Stream subscriptions with `getState`
857+
858+
For positioned/recoverable stream channels, the SDK can call an app-supplied `getState` callback that loads the application's current state from its own source of truth (database, API, etc.) and returns the corresponding stream position. The SDK subscribes with recovery from that position, so any publications committed after the state was read arrive as `publication` events. This is the "app-owned state" pattern — Centrifugo delivers only change events, your database stays the canonical store.
859+
860+
The SDK invokes `getState`:
861+
862+
* on the initial subscribe (no saved position);
863+
* on reconnect **only when server recovery fails** (server returns error 112 — unrecoverable position).
864+
865+
On a successful recovery, `getState` is **not** called — the SDK has a valid position and missed publications arrive as events.
866+
867+
Inside the callback, **read the stream position first, then read your data** so the position is a lower bound. Recovered publications may overlap data already loaded in `getState`; this is safe for idempotent updates, otherwise deduplicate by `offset`. See `SubscriptionOptions.getState` JSDoc for the full contract.
868+
869+
Requires **Centrifugo >= v6.8.0**.
870+
871+
References:
872+
873+
* Blog post — [Transactional publishing for stream subscriptions with PostgreSQL](https://centrifugal.dev/blog/2026/05/15/pg-stream-broker-benefits)
874+
* Example — [pg_stream_broker kitchen orders demo](https://github.com/centrifugal/examples/tree/master/v6/pg_stream_broker)
875+
876+
## Map subscriptions (experimental)
877+
878+
A **map subscription** delivers a real-time key-value collection whose lifecycle is managed by Centrifugo: clients receive a complete snapshot on subscribe, catch up on missed key changes after disconnects, and observe live per-key updates. Typical use cases include cursor positions, lobby members, feature flags, scoreboards, and per-key presence (`map_clients` / `map_users`).
879+
880+
Use `Centrifuge.newMapSubscription(channel, options?)` to create one. The variants `newMapClientsSubscription` and `newMapUsersSubscription` produce presence-style map subscriptions backed by `$clients:*` / `$users:*` channels. The returned `MapSubscription` emits a `sync` event whenever a fresh snapshot is delivered — on the initial subscribe, and again only when the SDK rebuilds state from scratch (e.g. after an unrecoverable position with the default `from_scratch` strategy). Every individual key change (set or remove) — both during live operation and on successful recovery after a disconnect — arrives as an `update` event; on successful recovery `sync` is suppressed because the application's existing snapshot is still valid. Call `publish(key, data)` / `remove(key)` to mutate.
881+
882+
> [!WARNING]
883+
> Map subscriptions are **experimental** and require **Centrifugo >= v6.8.0**. The shape and behavior may change in a backwards-incompatible way in subsequent minor releases as the feature stabilizes.
884+
885+
References:
886+
887+
* Server docs — [Map subscriptions](https://centrifugal.dev/docs/server/map_subscriptions)
888+
* PRO enhancements — [Map subscriptions (PRO)](https://centrifugal.dev/docs/pro/map_subscriptions)
889+
* Blog series — Map subscriptions Part 1 (synchronized key-value state and presence) and Part 2 (PostgreSQL map broker with transactional publishing)
890+
891+
## Shared poll subscriptions (experimental)
892+
893+
A **shared poll subscription** moves polling from clients to Centrifugo: clients register their interest in specific keys, and Centrifugo polls the backend once per configurable interval on behalf of everyone, fanning the changes back out. Backend load scales with the number of unique tracked items rather than the number of connected clients. Typical use cases include vote counts, prices, view counts, live scores, and configuration sync.
894+
895+
Use `Centrifuge.newSharedPollSubscription(channel, options?)`, then call `track(keys)` / `untrack(keys)` on the returned `SharedPollSubscription` to manage the interest set. Provide the `getSignature` callback in options to authorize tracked keys via per-key HMAC signatures (replayed on reconnect and refreshed on signature TTL). The subscription emits an `update` event per item whenever the backend reports a new version, including synthetic `removed: true` events when the server revokes a key.
896+
897+
> [!WARNING]
898+
> Shared poll subscriptions are **experimental** and require **Centrifugo >= v6.8.0**. The shape and behavior may change in a backwards-incompatible way in subsequent minor releases as the feature stabilizes.
899+
900+
References:
901+
902+
* Server docs — [Shared poll subscriptions](https://centrifugal.dev/docs/server/shared_poll)
903+
* PRO enhancements — [Shared poll (PRO)](https://centrifugal.dev/docs/pro/shared_poll)
904+
* Blog post — [Shared poll subscriptions: O(unique items) polling with low-latency updates](https://centrifugal.dev/blog/2026/05/12/shared-poll-subscriptions)
905+
853906
## Message batching
854907

855908
There is also a command batching support. It allows to send several commands to a server in one request - may be especially useful when connection established via one of HTTP-based transports.

0 commit comments

Comments
 (0)