Skip to content

Commit cb2d421

Browse files
committed
feat: subscribe custom events filter
1 parent c802402 commit cb2d421

File tree

7 files changed

+178
-76
lines changed

7 files changed

+178
-76
lines changed

src/main/default/classes/StreamingMonitorController.cls

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ public abstract class StreamingMonitorController {
33
public final static String EVT_GENERIC = 'GenericEvent';
44
public final static String EVT_STD_PLATFORM_EVENT = 'StandardPlatformEvent';
55
public final static String EVT_PLATFORM_EVENT = 'PlatformEvent';
6-
public final static String EVT_CDC_STANDARD = 'ChangeDataCaptureEvent';
6+
public final static String EVT_CDC = 'ChangeDataCaptureEvent';
77
public final static String EVT_CUSTOM_CHANNEL_CDC = 'CustomChannelCDC';
88
public final static String EVT_CUSTOM_CHANNEL_PE = 'CustomChannelPE';
99
public final static String EVT_MONITORING = 'MonitoringEvent';
@@ -56,7 +56,7 @@ public abstract class StreamingMonitorController {
5656
EVT_GENERIC => getGenericEventChannels(),
5757
EVT_STD_PLATFORM_EVENT => getStandardPlatformEventChannels(),
5858
EVT_PLATFORM_EVENT => getPlatformEventChannels(),
59-
EVT_CDC_STANDARD => getChangeDataCaptureEventChannels(),
59+
EVT_CDC => getChangeDataCaptureEventChannels(),
6060
EVT_CUSTOM_CHANNEL_CDC => new List<ComboBoxItem>(),
6161
EVT_CUSTOM_CHANNEL_PE => new List<ComboBoxItem>(),
6262
EVT_MONITORING => getEventMonitoringChannels()
@@ -307,7 +307,7 @@ public abstract class StreamingMonitorController {
307307
public void execute(QueueableContext context) {
308308
String restAPIURL =
309309
URL.getOrgDomainUrl().toExternalForm() +
310-
'/services/data/v60.0/sobjects/StreamingChannel/' +
310+
'/services/data/v62.0/sobjects/StreamingChannel/' +
311311
channelId +
312312
'/push';
313313
HttpRequest httpRequest = new HttpRequest();

src/main/default/lwc/actions/actions.js

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ import {
66
EVT_GENERIC,
77
EVT_STD_PLATFORM_EVENT,
88
EVT_PLATFORM_EVENT,
9-
EVT_CDC_STANDARD,
9+
EVT_CDC,
1010
EVT_CUSTOM_CHANNEL_CDC,
1111
EVT_CUSTOM_CHANNEL_PE,
1212
EVT_MONITORING,
13+
FILTER_ALL,
14+
FILTER_CUSTOM,
1315
getChannelPrefix
1416
} from 'c/streamingUtility';
1517

@@ -27,6 +29,7 @@ export default class Actions extends LightningElement {
2729
@api action = 'subscribeAll';
2830
@api channels = [];
2931

32+
subAllFilter = FILTER_ALL;
3033
subAllReplay = '-1';
3134

3235
subEventType;
@@ -60,6 +63,7 @@ export default class Actions extends LightningElement {
6063
handleSubscribeAll() {
6164
const subscribeEvent = new CustomEvent('subscribeall', {
6265
detail: {
66+
filter: this.subAllFilter,
6367
replayId: this.subAllReplay
6468
}
6569
});
@@ -301,7 +305,7 @@ export default class Actions extends LightningElement {
301305
}
302306

303307
get isStandardCDCReg() {
304-
return this.regEventType === EVT_CDC_STANDARD;
308+
return this.regEventType === EVT_CDC;
305309
}
306310

307311
get isCustomChannelCDCReg() {
@@ -318,7 +322,7 @@ export default class Actions extends LightningElement {
318322

319323
get isCDCSub() {
320324
return (
321-
this.subEventType === EVT_CDC_STANDARD ||
325+
this.subEventType === EVT_CDC ||
322326
this.subEventType === EVT_CUSTOM_CHANNEL_CDC
323327
);
324328
}
@@ -332,4 +336,11 @@ export default class Actions extends LightningElement {
332336
? 'Plain string payload'
333337
: 'JSON formatted payload with strings delimited by double quotes';
334338
}
339+
340+
get subAllFilterOptions() {
341+
return [
342+
{ label: 'All events', value: FILTER_ALL },
343+
{ label: 'Custom events only', value: FILTER_CUSTOM }
344+
];
345+
}
335346
}

src/main/default/lwc/actions/subscribeAll.html

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
<template>
2-
<lightning-card title="Subscribe to all streaming channels">
2+
<lightning-card title="Subscribe to multiple streaming channels">
33
<div class="slds-p-horizontal_large">
44
<p class="slds-m-vertical_small">
55
Custom Change Data Capture channels cannot be automatically
66
discovered. Supported channels: PushTopic, Generic, Platform
77
events, Change Data Capture events and Monitoring events.
88
</p>
9+
<lightning-combobox
10+
label="Filter"
11+
name="subAllFilter"
12+
value={subAllFilter}
13+
onchange={handleValueChange}
14+
options={subAllFilterOptions}
15+
>
16+
</lightning-combobox>
917
<lightning-combobox
1018
label="Replay option"
1119
name="subAllReplay"
@@ -16,7 +24,7 @@
1624
<div class="slds-align_absolute-center slds-m-top_medium">
1725
<lightning-button
1826
variant="brand"
19-
label="Subscribe to all"
27+
label="Subscribe"
2028
onclick={handleSubscribeAll}
2129
></lightning-button>
2230
</div>

src/main/default/lwc/sidebar/sidebar.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
></lightning-vertical-navigation-item-icon>
3535
<lightning-vertical-navigation-section label="Actions">
3636
<lightning-vertical-navigation-item
37-
label="Subscribe to all channels"
37+
label="Subscribe to channels"
3838
name="subscribeAll"
3939
></lightning-vertical-navigation-item>
4040
<lightning-vertical-navigation-item

src/main/default/lwc/streamingMonitor/streamingMonitor.js

Lines changed: 86 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,19 @@ import getAllEventChannels from '@salesforce/apex/StreamingMonitorController.get
1212
import publishStreamingEvent from '@salesforce/apex/StreamingMonitorController.publishStreamingEvent';
1313
import {
1414
EVENT_TYPES,
15-
EVT_CDC_STANDARD,
15+
EVT_CDC,
1616
CHANNEL_ALL_CDC,
17+
FILTER_CUSTOM,
18+
FILTER_ALL,
1719
isCDCChannel,
1820
getChannelPrefix,
1921
normalizeEvent,
20-
channelSort
22+
channelSort,
23+
isCustomChannel
2124
} from 'c/streamingUtility';
2225

2326
const RERENDER_DELAY = 200;
24-
const IGNORE_SUBCRIBE_ERRORS_DELAY = 3000;
27+
const IGNORE_SUBCRIBE_ERRORS_DELAY = 4000;
2528

2629
const VIEW_MONITOR = 'monitor';
2730
const VIEW_SUBSCRIBE_ALL = 'subscribeAll';
@@ -40,20 +43,17 @@ export default class StreamingMonitor extends LightningElement {
4043
eventsElement;
4144
rerenderTimeout;
4245

43-
connectedCallback() {
46+
async connectedCallback() {
4447
setDebugFlag(true);
4548

4649
onError((error) => this.handleStreamingError(error));
4750

48-
getAllEventChannels()
49-
.then((allChannels) => {
50-
this.channels = allChannels;
51-
})
52-
.catch((error) => {
53-
console.error(JSON.stringify(error));
54-
throw new Error('Failed to retrieve streaming channels');
55-
});
56-
51+
try {
52+
this.channels = await getAllEventChannels();
53+
} catch (error) {
54+
console.error(JSON.stringify(error));
55+
throw new Error('Failed to retrieve streaming channels');
56+
}
5757
window.addEventListener('resize', this.handleWindowResize.bind(this));
5858
}
5959

@@ -137,53 +137,81 @@ export default class StreamingMonitor extends LightningElement {
137137
}
138138
}
139139

140-
handleSubscribeAll(event) {
141-
console.log(`Subscribing to all streaming events`);
142-
const { replayId } = event.detail;
143-
144-
// Temporarily ignore subscribe errors while subscribing to all events
145-
this.ignoreSubscribeErrors = true;
146-
setTimeout(() => {
147-
this.ignoreSubscribeErrors = false;
148-
}, IGNORE_SUBCRIBE_ERRORS_DELAY);
140+
async handleSubscribeAll(event) {
141+
const { replayId, filter } = event.detail;
142+
console.log(
143+
`Subscribing to multiple streaming channels with filter ${filter} and replay ID ${replayId}`
144+
);
149145

150146
// Build list of channels
151147
let channels = [];
152148
EVENT_TYPES.forEach((eventType) => {
153149
const eventTypeName = eventType.value;
154-
if (eventTypeName === EVT_CDC_STANDARD) {
155-
// Use global channel for all CDC events
156-
channels.push(CHANNEL_ALL_CDC);
157-
} else {
158-
// Get channels for specific event type
150+
if (filter === FILTER_ALL) {
151+
if (eventTypeName === EVT_CDC) {
152+
// Use global channel for all CDC events
153+
channels.push(CHANNEL_ALL_CDC);
154+
} else {
155+
// Get all channels for the other event types
156+
const channelPrefix = getChannelPrefix(eventTypeName);
157+
this.channels[eventTypeName].forEach((channelData) => {
158+
channels.push(channelPrefix + channelData.value);
159+
});
160+
}
161+
} else if (filter === FILTER_CUSTOM) {
162+
// Get custom channels for all event types
159163
const channelPrefix = getChannelPrefix(eventTypeName);
160164
this.channels[eventTypeName].forEach((channelData) => {
161-
channels.push(channelPrefix + channelData.value);
165+
if (isCustomChannel(eventTypeName, channelData.value)) {
166+
channels.push(channelPrefix + channelData.value);
167+
}
162168
});
169+
} else {
170+
throw new Error(`Unsupported filter value: ${filter}`);
163171
}
164172
});
173+
165174
// Remove already subscribed channels
166175
channels = channels.filter(
167176
(channel) =>
168177
!this.subscriptions.some((sub) => sub.channel === channel)
169178
);
179+
180+
// Abort if there are no remaining channels
181+
if (channels.length === 0) {
182+
this.notify(
183+
'warn',
184+
'There are no channels to subscribe to with the specified filter and current subscriptions'
185+
);
186+
return;
187+
}
188+
189+
// Temporarily ignore subscribe errors while subscribing to events
190+
this.ignoreSubscribeErrors = true;
191+
setTimeout(() => {
192+
this.ignoreSubscribeErrors = false;
193+
}, IGNORE_SUBCRIBE_ERRORS_DELAY);
194+
170195
// Queue subscriptions
171196
const subscribePromises = channels.map((channel) => {
172197
return subscribe(channel, replayId, (streamingEvent) => {
173198
this.handleStreamingEvent(streamingEvent);
174199
});
175200
});
201+
176202
// Save susbcriptions and notify success once done
177-
Promise.all(subscribePromises).then((subscriptions) => {
178-
subscriptions.forEach((subscription) => {
179-
this.saveSubscription(subscription);
180-
});
181-
this.notify('success', 'Successfully subscribed to all channels');
182-
this.view = VIEW_MONITOR;
203+
const subscriptions = await Promise.all(subscribePromises);
204+
subscriptions.forEach((subscription) => {
205+
this.saveSubscription(subscription);
183206
});
207+
this.notify(
208+
'success',
209+
'Successfully subscribed to the specified channels'
210+
);
211+
this.view = VIEW_MONITOR;
184212
}
185213

186-
handleSubscribe(event) {
214+
async handleSubscribe(event) {
187215
const { channel, replayId } = event.detail;
188216

189217
// Check for duplicate subscription
@@ -196,17 +224,17 @@ export default class StreamingMonitor extends LightningElement {
196224
return;
197225
}
198226

199-
subscribe(channel, replayId, (streamingEvent) => {
200-
this.handleStreamingEvent(streamingEvent);
201-
}).then((subscription) => {
202-
this.notify(
203-
'success',
204-
'Successfully subscribed',
205-
subscription.channel
206-
);
207-
this.saveSubscription(subscription);
208-
this.view = VIEW_MONITOR;
209-
});
227+
// Subscribe
228+
const subscription = await subscribe(
229+
channel,
230+
replayId,
231+
(streamingEvent) => {
232+
this.handleStreamingEvent(streamingEvent);
233+
}
234+
);
235+
this.notify('success', 'Successfully subscribed', subscription.channel);
236+
this.saveSubscription(subscription);
237+
this.view = VIEW_MONITOR;
210238
}
211239

212240
saveSubscription(subscription) {
@@ -235,23 +263,19 @@ export default class StreamingMonitor extends LightningElement {
235263
this.eventsElement.addStreamingEvent(eventData);
236264
}
237265

238-
handlePublish(event) {
266+
async handlePublish(event) {
239267
const eventParams = event.detail;
240-
publishStreamingEvent(eventParams)
241-
.then(() => {
242-
this.notify(
243-
'success',
244-
`Successfully published event ${eventParams.eventName}`
245-
);
246-
this.view = VIEW_MONITOR;
247-
})
248-
.catch((error) => {
249-
console.error(JSON.stringify(error));
250-
this.notify(
251-
'error',
252-
`Failed to publish ${eventParams.eventName}`
253-
);
254-
});
268+
try {
269+
await publishStreamingEvent(eventParams);
270+
this.notify(
271+
'success',
272+
`Successfully published event ${eventParams.eventName}`
273+
);
274+
this.view = VIEW_MONITOR;
275+
} catch (error) {
276+
console.error(JSON.stringify(error));
277+
this.notify('error', `Failed to publish ${eventParams.eventName}`);
278+
}
255279
}
256280

257281
handleUnsubscribeAll() {

0 commit comments

Comments
 (0)