Skip to content

Commit 22428e9

Browse files
authored
Merge pull request #53 from pozil/pozil/managed-subs
feat: managed subscriptions
2 parents eeb56cd + d072eda commit 22428e9

File tree

9 files changed

+647
-252
lines changed

9 files changed

+647
-252
lines changed

README.md

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ See the [official Pub/Sub API repo](https://github.com/developerforce/pub-sub-ap
1818
- [Publish a platform event](#publish-a-platform-event)
1919
- [Subscribe with a replay ID](#subscribe-with-a-replay-id)
2020
- [Subscribe to past events in retention window](#subscribe-to-past-events-in-retention-window)
21+
- [Subscribe using a managed subscription](#subscribe-using-a-managed-subscription)
2122
- [Work with flow control for high volumes of events](#work-with-flow-control-for-high-volumes-of-events)
2223
- [Handle gRPC stream lifecycle events](#handle-grpc-stream-lifecycle-events)
2324
- [Common Issues](#common-issues)
@@ -383,6 +384,31 @@ await client.subscribeFromEarliestEvent(
383384
);
384385
```
385386
387+
### Subscribe using a managed subscription
388+
389+
You can turn your Pub/Sub client application stateless by delegating the tracking of replay IDs to the server thanks to [managed event subscriptions](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/managed-sub.html).
390+
391+
1. [Create a managed event subscription](https://developer.salesforce.com/docs/platform/pub-sub-api/guide/managed-sub.html#configuring-a-managed-event-subscription) using the tooling API. You can use API request templates from the [Salesforce Platform APIs](https://www.postman.com/salesforce-developers/salesforce-developers/folder/00bu8y3/managed-event-subscriptions) Postman collection to do so.
392+
1. Subscribe to 3 events from a managed event subscription (`Managed_Sample_PE` in this expample):
393+
```js
394+
await client.subscribeWithManagedSubscription(
395+
'Managed_Sample_PE',
396+
subscribeCallback,
397+
3
398+
);
399+
```
400+
1. Using the subscription information sent in the subscribe callback, frequently commit the last replay ID that you receveive:
401+
```js
402+
client.commitReplayId(
403+
subscription.subscriptionId,
404+
subscription.lastReplayId
405+
);
406+
```
407+
1. Optionnaly, request additional events to be sent (3 more in this example):
408+
```js
409+
client.requestAdditionalManagedEvents(subscription.subscriptionId, 3);
410+
```
411+
386412
### Work with flow control for high volumes of events
387413
388414
When working with high volumes of events you can control the incoming flow of events by requesting a limited batch of events. This event flow control ensures that the client doesn’t get overwhelmed by accepting more events that it can handle if there is a spike in event publishing.
@@ -490,15 +516,26 @@ Builds a new Pub/Sub API client.
490516
491517
Closes the gRPC connection. The client will no longer receive events for any topic.
492518
519+
#### `commitReplayId(subscriptionId, replayId) → string`
520+
521+
Commits a replay ID on a managed subscription.
522+
523+
Returns: commit request UUID.
524+
525+
| Name | Type | Description |
526+
| ---------------- | ------ | ----------------------- |
527+
| `subscriptionId` | string | managed subscription ID |
528+
| `replayId` | number | event replay ID |
529+
493530
#### `async connect() → {Promise.<void>}`
494531
495532
Authenticates with Salesforce then connects to the Pub/Sub API.
496533
497534
Returns: Promise that resolves once the connection is established.
498535
499-
#### `async getConnectivityState() → Promise<connectivityState>}`
536+
#### `async getConnectivityState() → {Promise<connectivityState>}`
500537
501-
Get connectivity state from current channel.
538+
Gets the gRPC connectivity state from the current channel.
502539
503540
Returns: Promise that holds the channel's [connectivity state](https://grpc.github.io/grpc/node/grpc.html#.connectivityState).
504541
@@ -545,6 +582,18 @@ Subscribes to a topic and retrieves past events starting from a replay ID.
545582
| `numRequested` | number | number of events requested. If `null`, the client keeps the subscription alive forever. |
546583
| `replayId` | number | replay ID |
547584
585+
#### `async subscribeWithManagedSubscription(subscriptionIdOrName, subscribeCallback, [numRequested])`
586+
587+
Subscribes to a topic thanks to a managed subscription.
588+
589+
Throws an error if the managed subscription does not exist or is not in the `RUN` state.
590+
591+
| Name | Type | Description |
592+
| ---------------------- | --------------------------------------- | -------------------------------------------------------------------------------------------------------------- |
593+
| `subscriptionIdOrName` | string | managed subscription ID or developer name |
594+
| `subscribeCallback` | [SubscribeCallback](#subscribecallback) | subscribe callback function |
595+
| `numRequested` | number | optional number of events requested. If not supplied or null, the client keeps the subscription alive forever. |
596+
548597
#### `requestAdditionalEvents(topicName, numRequested)`
549598
550599
Request additional events on an existing subscription.
@@ -554,6 +603,15 @@ Request additional events on an existing subscription.
554603
| `topicName` | string | name of the topic. |
555604
| `numRequested` | number | number of events requested. |
556605
606+
#### `requestAdditionalManagedEvents(subscriptionId, numRequested)`
607+
608+
Request additional events on an existing managed subscription.
609+
610+
| Name | Type | Description |
611+
| ---------------- | ------ | --------------------------- |
612+
| `subscriptionId` | string | managed subscription ID. |
613+
| `numRequested` | number | number of events requested. |
614+
557615
### SubscribeCallback
558616
559617
Callback function that lets you process incoming Pub/Sub API events while keeping track of the topic name and the volume of events requested/received.
@@ -581,12 +639,15 @@ Callback types:
581639
582640
Holds the information related to a subscription.
583641
584-
| Name | Type | Description |
585-
| --------------------- | ------ | ------------------------------------------------------------------------------ |
586-
| `topicName` | string | topic name for this subscription. |
587-
| `requestedEventCount` | number | number of events that were requested when subscribing. |
588-
| `receivedEventCount` | number | the number of events that were received since subscribing. |
589-
| `lastReplayId` | number | replay ID of the last processed event or `null` if no event was processed yet. |
642+
| Name | Type | Description |
643+
| --------------------- | ------- | ------------------------------------------------------------------------------ |
644+
| `isManaged` | boolean | whether this is a managed event subscription or not. |
645+
| `topicName` | string | topic name for this subscription. |
646+
| `subscriptionId` | string | managed subscription ID. Undefined for regular subscriptions. |
647+
| `subscriptionName` | string | managed subscription name. Undefined for regular subscriptions. |
648+
| `requestedEventCount` | number | number of events that were requested when subscribing. |
649+
| `receivedEventCount` | number | the number of events that were received since subscribing. |
650+
| `lastReplayId` | number | replay ID of the last processed event or `null` if no event was processed yet. |
590651
591652
### EventParseError
592653

spec/helper/clientUtilities.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import PubSubApiClient from '../../src/client.js';
2-
import { AuthType } from '../../src/utils/configuration.js';
2+
import { AuthType } from '../../src/utils/types.js';
33

44
/**
55
* Prepares a connected PubSub API client

spec/integration/client.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import fs from 'fs';
22
import * as dotenv from 'dotenv';
33
import PubSubApiClient from '../../src/client.js';
4-
import { AuthType } from '../../src/utils/configuration.js';
4+
import { AuthType } from '../../src/utils/types.js';
55
import {
66
getSalesforceConnection,
77
getSampleAccount,

spec/integration/clientFailures.spec.js

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as dotenv from 'dotenv';
22
import PubSubApiClient from '../../src/client.js';
3-
import { AuthType } from '../../src/utils/configuration.js';
3+
import { AuthType } from '../../src/utils/types.js';
44
import SimpleFileLogger from '../helper/simpleFileLogger.js';
55
import injectJasmineReporter from '../helper/reporter.js';
66
import { sleep, waitFor } from '../helper/asyncUtilities.js';
@@ -68,7 +68,7 @@ describe('Client failures', function () {
6868
expect(isConnectionClosed).toBeTrue();
6969
});
7070

71-
it('fails to subscribe to an invalid event', async function () {
71+
it('fails to subscribe to an invalid topic name', async function () {
7272
let grpcStatusCode, errorCode;
7373
let isConnectionClosed = false;
7474

@@ -85,7 +85,7 @@ describe('Client failures', function () {
8585
isConnectionClosed = true;
8686
}
8787
};
88-
client.subscribe('/event/INVALID', callback, 1);
88+
client.subscribe('/event/INVALID', callback);
8989

9090
// Wait for subscribe to be effective and error to surface
9191
await waitFor(5000, () => errorCode !== undefined);
@@ -95,4 +95,37 @@ describe('Client failures', function () {
9595
expect(grpcStatusCode).toBe(7);
9696
expect(isConnectionClosed).toBeTrue();
9797
});
98+
99+
it('fails to subscribe to an invalid managed subscription name', async function () {
100+
// Build PubSub client
101+
client = await getConnectedPubSubApiClient(logger);
102+
103+
// Send subscribe request
104+
try {
105+
await client.subscribeWithManagedSubscription('INVALID', () => {});
106+
} catch (error) {
107+
expect(error.message).toMatch(
108+
'Failed to retrieve managed event subscription'
109+
);
110+
} finally {
111+
client.close();
112+
}
113+
});
114+
115+
it('fails to subscribe to an managed subscription that is not running', async function () {
116+
// Build PubSub client
117+
client = await getConnectedPubSubApiClient(logger);
118+
119+
// Send subscribe request
120+
try {
121+
await client.subscribeWithManagedSubscription(
122+
'Managed_Inactive_Sample_PE',
123+
() => {}
124+
);
125+
} catch (error) {
126+
expect(error.message).toMatch('subscription is in STOP state');
127+
} finally {
128+
client.close();
129+
}
130+
});
98131
});

0 commit comments

Comments
 (0)