-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpubsub.ts
More file actions
569 lines (465 loc) · 16.1 KB
/
pubsub.ts
File metadata and controls
569 lines (465 loc) · 16.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
/**
* Redis Pub/Sub Operations Example
*
* This comprehensive example demonstrates Redis publish/subscribe messaging including:
* - Basic channel subscriptions and message publishing
* - Pattern-based subscriptions for flexible message routing
* - Multiple subscribers and message broadcasting
* - Real-time messaging patterns and best practices
* - Graceful connection management and cleanup
* - Error handling and recovery strategies
*
* Redis Pub/Sub is ideal for real-time applications like chat systems,
* live notifications, event broadcasting, and microservice communication.
*
* @example
* ```bash
* # Run this example
* deno run -A examples/pubsub.ts
* ```
*/
import { connect, type Redis, type RedisConnectOptions } from "../mod.ts";
/**
* Configuration for Redis connections optimized for pub/sub
*/
const REDIS_CONFIG: RedisConnectOptions = {
hostname: Deno.env.get("REDIS_HOST") ?? "127.0.0.1",
port: parseInt(Deno.env.get("REDIS_PORT") ?? "6379"),
password: Deno.env.get("REDIS_PASSWORD"),
db: parseInt(Deno.env.get("REDIS_DB") ?? "0"),
};
/**
* Interface for message handling statistics
*/
interface MessageStats {
channelMessages: Map<string, number>;
patternMessages: Map<string, number>;
totalMessages: number;
startTime: number;
}
/**
* Utility to create a delay.
* @param ms Milliseconds to delay.
*/
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
/**
* Basic channel subscription demonstration
*/
async function demonstrateBasicPubSub(
publisher: Redis,
subscriber: Redis,
): Promise<void> {
console.log("\n📢 === Basic Channel Subscription ===");
const newsChannel = "news";
const sportsChannel = "sports";
// Subscribe to specific channels
const sub = await subscriber.subscribe(newsChannel, sportsChannel);
console.log(`✅ Subscribed to channels: ${newsChannel}, ${sportsChannel}`);
// Set up message handler
let messageCount = 0;
const messageHandler = async () => {
console.log("👂 Listening for messages...");
for await (const message of sub.receive()) {
messageCount++;
console.log(`📨 Message ${messageCount}:`);
console.log(` Channel: ${message.channel}`);
console.log(` Message: ${message.message}`);
// Stop after receiving 3 messages
if (messageCount >= 3) {
break;
}
}
};
// Start listening in background
const listenerPromise = messageHandler();
// Give subscriber time to set up
await delay(100);
// Publish some messages
console.log("\n📤 Publishing messages...");
const subscribers1 = await publisher.publish(
newsChannel,
"Breaking: Deno 2.0 released!",
);
console.log(`📮 Published to news channel (${subscribers1} subscribers)`);
const subscribers2 = await publisher.publish(
sportsChannel,
"Goal! Team A scores!",
);
console.log(`📮 Published to sports channel (${subscribers2} subscribers)`);
const subscribers3 = await publisher.publish(
newsChannel,
"Weather update: Sunny day ahead",
);
console.log(
`📮 Published another news message (${subscribers3} subscribers)`,
);
// Wait for messages to be received
await listenerPromise;
// Unsubscribe from channels but don't close the connection
await sub.unsubscribe(newsChannel, sportsChannel);
console.log("✅ Unsubscribed from basic channels");
}
/**
* Pattern-based subscription demonstration
*/
async function demonstratePatternSubscriptions(
publisher: Redis,
subscriber: Redis,
): Promise<void> {
console.log("\n🔍 === Pattern Subscription ===");
const patternSub = await subscriber.psubscribe("user:*", "log:*");
console.log("✅ Subscribed to patterns: user:*, log:*");
// Pattern message handler
let patternMessageCount = 0;
const patternHandler = async () => {
console.log("👂 Listening for pattern messages...");
for await (const message of patternSub.receive()) {
patternMessageCount++;
console.log(`📨 Pattern Message ${patternMessageCount}:`);
console.log(` Pattern: ${message.pattern}`);
console.log(` Channel: ${message.channel}`);
console.log(` Message: ${message.message}`);
if (patternMessageCount >= 3) {
break;
}
}
};
const patternListenerPromise = patternHandler();
// Give time to set up
await delay(100);
// Publish to channels matching patterns
console.log("\n📤 Publishing to pattern-matching channels...");
await publisher.publish("user:1001", "User logged in");
console.log("📮 Published to user:1001");
await publisher.publish("user:1002", "User updated profile");
console.log("📮 Published to user:1002");
await publisher.publish("log:error", "Database connection failed");
console.log("📮 Published to log:error");
await publisher.publish("random:channel", "This won't match any pattern");
console.log("📮 Published to random:channel (won't be received)");
// Wait for pattern messages
await patternListenerPromise;
patternSub.close();
console.log("✅ Closed pattern subscription");
}
/**
* Multiple subscribers demonstration
*/
async function demonstrateMultipleSubscribers(
publisher: Redis,
subscriber1: Redis,
subscriber2: Redis,
): Promise<void> {
console.log("\n👥 === Multiple Subscribers ===");
const broadcastChannel = "broadcast";
// Create multiple subscribers
const sub1 = await subscriber1.subscribe(broadcastChannel);
const sub2 = await subscriber2.subscribe(broadcastChannel);
console.log("✅ Set up 2 subscribers for broadcast channel");
// Count messages for each subscriber
let sub1Count = 0;
let sub2Count = 0;
const sub1Handler = async () => {
for await (const message of sub1.receive()) {
sub1Count++;
console.log(`📨 Subscriber 1 received: ${message.message}`);
if (sub1Count >= 2) break;
}
};
const sub2Handler = async () => {
for await (const message of sub2.receive()) {
sub2Count++;
console.log(`📨 Subscriber 2 received: ${message.message}`);
if (sub2Count >= 2) break;
}
};
// Start both handlers
const sub1Promise = sub1Handler();
const sub2Promise = sub2Handler();
// Give time to set up
await delay(100);
// Publish broadcast messages
console.log("\n📤 Broadcasting messages...");
const subscriberCount1 = await publisher.publish(
broadcastChannel,
"Hello everyone!",
);
console.log(
`📮 Broadcast message 1 - Reached ${subscriberCount1} subscribers`,
);
const subscriberCount2 = await publisher.publish(
broadcastChannel,
"This is a broadcast!",
);
console.log(
`📮 Broadcast message 2 - Reached ${subscriberCount2} subscribers`,
);
// Wait for all handlers to complete
await Promise.all([sub1Promise, sub2Promise]);
console.log(`📊 Final counts - Sub1: ${sub1Count}, Sub2: ${sub2Count}`);
// Clean up
sub1.close();
sub2.close();
console.log("✅ Multiple subscribers demonstration completed");
}
/**
* Real-time messaging simulation
*/
async function demonstrateRealTimeMessaging(
publisher: Redis,
subscriber: Redis,
): Promise<void> {
console.log("\n⚡ === Real-time Messaging Simulation ===");
const chatChannel = "chat:room1";
const sub = await subscriber.subscribe(chatChannel);
console.log("✅ Subscribed to chat room");
// Simulate chat messages
const chatMessages = [
"Alice: Hello everyone!",
"Bob: Hi Alice, how are you?",
"Charlie: Great to see you both here!",
"Alice: This Redis pub/sub is really fast!",
"Bob: Agreed, perfect for real-time chat!",
];
let messageCount = 0;
const chatHandler = async () => {
console.log("👂 Chat room is now active...");
for await (const message of sub.receive()) {
console.log(`💬 ${message.message}`);
messageCount++;
if (messageCount >= chatMessages.length) {
break;
}
}
};
const chatPromise = chatHandler();
// Give time to set up
await delay(100);
// Simulate real-time message publishing
console.log("📤 Starting chat simulation...");
for (const chatMessage of chatMessages) {
await publisher.publish(chatChannel, chatMessage);
// Simulate typing delay
await delay(500);
}
await chatPromise;
sub.close();
console.log("✅ Chat simulation completed");
}
/**
* Channel management demonstration
*/
async function demonstrateChannelManagement(
publisher: Redis,
subscriber: Redis,
): Promise<void> {
console.log("\n🔧 === Channel Management ===");
// Start with no subscriptions
console.log("📊 Demonstrating dynamic subscription management...");
const managedSub = await subscriber.subscribe("initial:channel");
// Add more channels before starting to receive
console.log("➕ Adding more channels to subscription...");
await managedSub.subscribe("dynamic:channel1", "dynamic:channel2");
let messageCount = 0;
const managementHandler = async () => {
for await (const message of managedSub.receive()) {
messageCount++;
console.log(`📨 Received on ${message.channel}: ${message.message}`);
if (messageCount >= 3) break; // Adjusted since we'll receive fewer messages
}
};
const managementPromise = managementHandler();
await delay(100);
// Publish to different channels
await publisher.publish("initial:channel", "Message to initial channel");
await publisher.publish("dynamic:channel1", "Message to dynamic channel 1");
await publisher.publish("dynamic:channel2", "Message to dynamic channel 2");
await managementPromise;
// Demonstrate unsubscribing (create a new subscription for this)
console.log("➖ Demonstrating unsubscribe functionality...");
const unsubSub = await subscriber.subscribe("temp:channel");
// Publish a message that will be received
await publisher.publish("temp:channel", "Message before unsubscribe");
// Wait a bit for the message to be processed
await delay(100);
// Unsubscribe from the temporary channel
await unsubSub.unsubscribe("temp:channel");
// This message won't be received since we unsubscribed
await publisher.publish("temp:channel", "This won't be received");
managedSub.close();
console.log("✅ Channel management demonstration completed");
}
/**
* Advanced pub/sub patterns demonstration
*/
async function demonstrateAdvancedPatterns(
publisher: Redis,
subscriber: Redis,
): Promise<void> {
console.log("\n🚀 === Advanced Pub/Sub Patterns ===");
// Message statistics tracking
const stats: MessageStats = {
channelMessages: new Map(),
patternMessages: new Map(),
totalMessages: 0,
startTime: Date.now(),
};
// Subscribe to multiple patterns for comprehensive monitoring
const monitoringSub = await subscriber.psubscribe(
"app:*",
"system:*",
"user:*",
);
console.log("✅ Subscribed to monitoring patterns: app:*, system:*, user:*");
const monitoringHandler = async () => {
console.log("👂 Starting message monitoring...");
for await (const message of monitoringSub.receive()) {
stats.totalMessages++;
// Track pattern statistics
const pattern = message.pattern!;
stats.patternMessages.set(
pattern,
(stats.patternMessages.get(pattern) || 0) + 1,
);
// Track channel statistics
stats.channelMessages.set(
message.channel,
(stats.channelMessages.get(message.channel) || 0) + 1,
);
console.log(
`📊 [${stats.totalMessages}] ${message.channel} (${pattern}): ${message.message}`,
);
if (stats.totalMessages >= 8) break;
}
};
const monitoringPromise = monitoringHandler();
await delay(100);
// Simulate application events
console.log("📤 Simulating application events...");
const events = [
{ channel: "app:login", message: "User authentication successful" },
{ channel: "app:logout", message: "User session terminated" },
{ channel: "system:health", message: "System health check passed" },
{ channel: "system:error", message: "Database connection timeout" },
{ channel: "user:profile", message: "Profile updated successfully" },
{ channel: "user:notification", message: "New message received" },
{ channel: "app:purchase", message: "Payment processed successfully" },
{ channel: "system:backup", message: "Daily backup completed" },
];
for (const event of events) {
await publisher.publish(event.channel, event.message);
await delay(200);
}
await monitoringPromise;
// Display statistics
console.log("\n📈 Message Statistics:");
console.log(` Total messages: ${stats.totalMessages}`);
console.log(` Processing time: ${Date.now() - stats.startTime}ms`);
console.log(" Pattern breakdown:");
for (const [pattern, count] of stats.patternMessages) {
console.log(` ${pattern}: ${count} messages`);
}
monitoringSub.close();
console.log("✅ Advanced patterns demonstration completed");
}
/**
* Error handling for pub/sub operations
*/
async function handlePubSubError(error: unknown): Promise<void> {
console.error("❌ Pub/Sub Error occurred:");
if (error instanceof Error) {
console.error(` Type: ${error.constructor.name}`);
console.error(` Message: ${error.message}`);
if (error.stack) {
console.error(
` Stack: ${error.stack.split("\n").slice(0, 3).join("\n")}`,
);
}
} else {
console.error(` Error: ${String(error)}`);
}
}
/**
* Cleanup function for pub/sub resources
*/
async function cleanupConnections(
connections: (Redis | null)[],
): Promise<void> {
console.log("\n🧹 Cleaning up pub/sub resources...");
for (const conn of connections) {
if (conn) {
try {
conn.close();
console.log(`✅ Connection closed`);
} catch (error) {
console.error("⚠️ Error during cleanup:", error);
}
}
}
}
/**
* Main function demonstrating pub/sub operations
*/
async function pubSubExample(): Promise<void> {
console.log("📡 Redis Pub/Sub Operations Example");
console.log("=".repeat(60));
// A single publisher connection can be reused.
const publisher = await connect(REDIS_CONFIG);
console.log("✅ Publisher connection established");
try {
// Each subscriber or group of subscribers needs a dedicated connection.
// This wrapper simplifies connection setup and teardown for each demo.
const withSubscribers = async (
subscriberCount: number,
demonstration: (
publisher: Redis,
...subscribers: Redis[]
) => Promise<void>,
) => {
const subscribers = await Promise.all(
Array.from({ length: subscriberCount }, () => connect(REDIS_CONFIG)),
);
try {
await demonstration(publisher, ...subscribers);
} finally {
subscribers.forEach((s) => s.close());
}
};
// Core pub/sub demonstrations
await withSubscribers(1, demonstrateBasicPubSub);
await withSubscribers(1, demonstratePatternSubscriptions);
await withSubscribers(2, demonstrateMultipleSubscribers);
await withSubscribers(1, demonstrateRealTimeMessaging);
await withSubscribers(1, demonstrateChannelManagement);
await withSubscribers(1, demonstrateAdvancedPatterns);
console.log("\n🎉 All pub/sub operations completed successfully!");
console.log("\n💡 Key Takeaways:");
console.log(
" • Use separate connections for publishers and subscribers.",
);
console.log(" • Pattern subscriptions provide flexible message routing.");
console.log(" • Multiple subscribers enable message broadcasting.");
console.log(
" • Dynamic subscription management allows runtime flexibility.",
);
console.log(" • Always handle errors and clean up resources properly.");
} catch (error) {
await handlePubSubError(error);
} finally {
await cleanupConnections([publisher]);
}
}
/**
* Graceful shutdown handler
*/
const gracefulShutdown = async () => {
console.log("\n🔔 Signal received, shutting down gracefully...");
await cleanupConnections([publisher]);
Deno.exit(0);
};
Deno.addSignalListener("SIGINT", gracefulShutdown);
Deno.addSignalListener("SIGTERM", gracefulShutdown);
if (import.meta.main) {
// Execute the example
await pubSubExample();
}