Skip to content

Commit d328ea0

Browse files
authored
Merge pull request #29 from pozil/pozil/custom-channels
fix: custom channel support
2 parents 43bf1e8 + ae189b8 commit d328ea0

File tree

3 files changed

+185
-57
lines changed

3 files changed

+185
-57
lines changed

package.json

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,22 @@
2222
"prepublishOnly": "npm run build"
2323
},
2424
"dependencies": {
25-
"@grpc/grpc-js": "^1.9.13",
25+
"@grpc/grpc-js": "^1.10.3",
2626
"@grpc/proto-loader": "^0.7.10",
2727
"avro-js": "^1.11.3",
2828
"certifi": "^14.5.15",
29-
"dotenv": "^16.3.1",
29+
"dotenv": "^16.4.5",
3030
"jsforce": "^1.11.1",
31-
"undici": "^6.2.0"
31+
"undici": "^6.10.1"
3232
},
3333
"devDependencies": {
34-
"@chialab/esbuild-plugin-meta-url": "^0.17.7",
35-
"eslint": "^8.56.0",
36-
"husky": "^8.0.3",
37-
"lint-staged": "^15.2.0",
38-
"prettier": "^3.1.1",
39-
"tsup": "^8.0.1",
40-
"typescript": "^5.3.3"
34+
"@chialab/esbuild-plugin-meta-url": "^0.18.2",
35+
"eslint": "^8.57.0",
36+
"husky": "^9.0.11",
37+
"lint-staged": "^15.2.2",
38+
"prettier": "^3.2.5",
39+
"tsup": "^8.0.2",
40+
"typescript": "^5.4.3"
4141
},
4242
"lint-staged": {
4343
"**/src/*.{css,html,js,json,md,yaml,yml}": [

src/client.js

Lines changed: 98 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import certifi from 'certifi';
77
import grpc from '@grpc/grpc-js';
88
import protoLoader from '@grpc/proto-loader';
99

10+
import SchemaCache from './utils/schemaCache.js';
1011
import EventParseError from './utils/eventParseError.js';
1112
import PubSubEventEmitter from './utils/pubSubEventEmitter.js';
1213
import { CustomLongAvroType } from './utils/avroHelper.js';
@@ -25,13 +26,6 @@ import SalesforceAuth from './utils/auth.js';
2526
* @global
2627
*/
2728

28-
/**
29-
* @typedef {Object} Schema
30-
* @property {string} id
31-
* @property {Object} type
32-
* @protected
33-
*/
34-
3529
/**
3630
* @typedef {Object} Logger
3731
* @property {Function} debug
@@ -60,8 +54,8 @@ export default class PubSubApiClient {
6054
#client;
6155

6256
/**
63-
* Map of schemas indexed by topic name
64-
* @type {Map<string,Schema>}
57+
* Schema cache
58+
* @type {SchemaCache}
6559
*/
6660
#schemaChache;
6761

@@ -79,7 +73,7 @@ export default class PubSubApiClient {
7973
*/
8074
constructor(logger = console) {
8175
this.#logger = logger;
82-
this.#schemaChache = new Map();
76+
this.#schemaChache = new SchemaCache();
8377
this.#subscriptions = new Map();
8478
// Check and load config
8579
try {
@@ -319,16 +313,33 @@ export default class PubSubApiClient {
319313
);
320314
data.events.forEach(async (event) => {
321315
try {
322-
// Load event schema from cache or from the client
323-
let schema = await this.#getEventSchema(topicName);
324-
// Make sure that schema ID matches. If not, event fields may have changed
325-
// and client needs to reload schema
326-
if (schema.id !== event.event.schemaId) {
327-
this.#logger.info(
328-
`Event schema changed (${schema.id} != ${event.event.schemaId}), reloading: ${topicName}`
316+
let schema;
317+
// Are we subscribing to a custom channel?
318+
if (topicName.endsWith('__chn')) {
319+
// Use schema ID instead of topic name to retrieve schema
320+
schema = await this.#getEventSchemaFromId(
321+
event.event.schemaId
329322
);
330-
this.#schemaChache.delete(topicName);
331-
schema = await this.#getEventSchema(topicName);
323+
} else {
324+
// Load event schema from cache or from the client
325+
schema =
326+
await this.#getEventSchemaFromTopicName(
327+
topicName
328+
);
329+
// Make sure that schema ID matches. If not, event fields may have changed
330+
// and client needs to reload schema
331+
if (schema.id !== event.event.schemaId) {
332+
this.#logger.info(
333+
`Event schema changed (${schema.id} != ${event.event.schemaId}), reloading: ${topicName}`
334+
);
335+
this.#schemaChache.deleteWithTopicName(
336+
topicName
337+
);
338+
schema =
339+
await this.#getEventSchemaFromTopicName(
340+
topicName
341+
);
342+
}
332343
}
333344
// Parse event thanks to schema
334345
const parsedEvent = parseEvent(schema, event);
@@ -448,7 +459,7 @@ export default class PubSubApiClient {
448459
if (!this.#client) {
449460
throw new Error('Pub/Sub API client is not connected.');
450461
}
451-
const schema = await this.#getEventSchema(topicName);
462+
const schema = await this.#getEventSchemaFromTopicName(topicName);
452463

453464
const id = correlationKey ? correlationKey : crypto.randomUUID();
454465
const response = await new Promise((resolve, reject) => {
@@ -487,22 +498,25 @@ export default class PubSubApiClient {
487498
* @memberof PubSubApiClient.prototype
488499
*/
489500
close() {
490-
this.#logger.info('closing gRPC stream');
501+
this.#logger.info('Closing gRPC stream');
491502
this.#client.close();
492503
}
493504

494505
/**
495-
* Retrieves the event schema for a topic from the cache.
506+
* Retrieves an event schema from the cache based on a topic name.
496507
* If it's not cached, fetches the shema with the gRPC client.
497508
* @param {string} topicName name of the topic that we're fetching
498509
* @returns {Promise<Schema>} Promise holding parsed event schema
499510
*/
500-
async #getEventSchema(topicName) {
501-
let schema = this.#schemaChache.get(topicName);
511+
async #getEventSchemaFromTopicName(topicName) {
512+
let schema = this.#schemaChache.getFromTopicName(topicName);
502513
if (!schema) {
503514
try {
504-
schema = await this.#fetchEventSchemaWithClient(topicName);
505-
this.#schemaChache.set(topicName, schema);
515+
schema =
516+
await this.#fetchEventSchemaFromTopicNameWithClient(
517+
topicName
518+
);
519+
this.#schemaChache.setWithTopicName(topicName, schema);
506520
} catch (error) {
507521
throw new Error(
508522
`Failed to load schema for topic ${topicName}`,
@@ -513,34 +527,71 @@ export default class PubSubApiClient {
513527
return schema;
514528
}
515529

530+
/**
531+
* Retrieves an event schema from the cache based on its ID.
532+
* If it's not cached, fetches the shema with the gRPC client.
533+
* @param {string} schemaId ID of the schema that we're fetching
534+
* @returns {Promise<Schema>} Promise holding parsed event schema
535+
*/
536+
async #getEventSchemaFromId(schemaId) {
537+
let schema = this.#schemaChache.getFromId(schemaId);
538+
if (!schema) {
539+
try {
540+
schema = await this.#fetchEventSchemaFromIdWithClient(schemaId);
541+
this.#schemaChache.set(schema);
542+
} catch (error) {
543+
throw new Error(`Failed to load schema with ID ${schemaId}`, {
544+
cause: error
545+
});
546+
}
547+
}
548+
return schema;
549+
}
550+
516551
/**
517552
* Requests the event schema for a topic using the gRPC client
518553
* @param {string} topicName name of the topic that we're fetching
519554
* @returns {Promise<Schema>} Promise holding parsed event schema
520555
*/
521-
async #fetchEventSchemaWithClient(topicName) {
556+
async #fetchEventSchemaFromTopicNameWithClient(topicName) {
522557
return new Promise((resolve, reject) => {
523-
this.#client.GetTopic({ topicName }, (topicError, response) => {
524-
if (topicError) {
525-
reject(topicError);
526-
} else {
527-
// Get the schema information
528-
const { schemaId } = response;
529-
this.#client.GetSchema({ schemaId }, (schemaError, res) => {
530-
if (schemaError) {
531-
reject(schemaError);
532-
} else {
533-
const schemaType = avro.parse(res.schemaJson, {
534-
registry: { long: CustomLongAvroType }
535-
});
536-
this.#logger.info(
537-
`Topic schema loaded: ${topicName}`
558+
this.#client.GetTopic(
559+
{ topicName },
560+
async (topicError, response) => {
561+
if (topicError) {
562+
reject(topicError);
563+
} else {
564+
// Get the schema information
565+
const { schemaId } = response;
566+
const schemaInfo =
567+
await this.#fetchEventSchemaFromIdWithClient(
568+
schemaId
538569
);
539-
resolve({
540-
id: schemaId,
541-
type: schemaType
542-
});
543-
}
570+
this.#logger.info(`Topic schema loaded: ${topicName}`);
571+
resolve(schemaInfo);
572+
}
573+
}
574+
);
575+
});
576+
}
577+
578+
/**
579+
* Requests the event schema from an ID using the gRPC client
580+
* @param {string} schemaId schema ID that we're fetching
581+
* @returns {Promise<Schema>} Promise holding parsed event schema
582+
*/
583+
async #fetchEventSchemaFromIdWithClient(schemaId) {
584+
return new Promise((resolve, reject) => {
585+
this.#client.GetSchema({ schemaId }, (schemaError, res) => {
586+
if (schemaError) {
587+
reject(schemaError);
588+
} else {
589+
const schemaType = avro.parse(res.schemaJson, {
590+
registry: { long: CustomLongAvroType }
591+
});
592+
resolve({
593+
id: schemaId,
594+
type: schemaType
544595
});
545596
}
546597
});

src/utils/schemaCache.js

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* @typedef {Object} Schema
3+
* @property {string} id
4+
* @property {Object} type
5+
* @protected
6+
*/
7+
8+
export default class SchemaCache {
9+
/**
10+
* Map of schemas indexed by ID
11+
* @type {Map<string,Schema>}
12+
*/
13+
#schemaChache;
14+
15+
/**
16+
* Map of schemas IDs indexed by topic name
17+
* @type {Map<string,string>}
18+
*/
19+
#topicNameCache;
20+
21+
constructor() {
22+
this.#schemaChache = new Map();
23+
this.#topicNameCache = new Map();
24+
}
25+
26+
/**
27+
* Retrieves a schema based on its ID
28+
* @param {string} schemaId
29+
* @returns {Schema} schema or undefined if not found
30+
*/
31+
getFromId(schemaId) {
32+
return this.#schemaChache.get(schemaId);
33+
}
34+
35+
/**
36+
* Retrieves a schema based on a topic name
37+
* @param {string} topicName
38+
* @returns {Schema} schema or undefined if not found
39+
*/
40+
getFromTopicName(topicName) {
41+
const schemaId = this.#topicNameCache.get(topicName);
42+
if (schemaId) {
43+
return this.getFromId(schemaId);
44+
}
45+
return undefined;
46+
}
47+
48+
/**
49+
* Caches a schema
50+
* @param {Schema} schema
51+
*/
52+
set(schema) {
53+
this.#schemaChache.set(schema.id, schema);
54+
}
55+
56+
/**
57+
* Caches a schema with a topic name
58+
* @param {string} topicName
59+
* @param {Schema} schema
60+
*/
61+
setWithTopicName(topicName, schema) {
62+
this.#topicNameCache.set(topicName, schema.id);
63+
this.set(schema);
64+
}
65+
66+
/**
67+
* Delete a schema based on the topic name
68+
* @param {string} topicName
69+
*/
70+
deleteWithTopicName(topicName) {
71+
const schemaId = this.#topicNameCache.get(topicName);
72+
if (schemaId) {
73+
this.#schemaChache.delete(schemaId);
74+
}
75+
this.#topicNameCache.delete(topicName);
76+
}
77+
}

0 commit comments

Comments
 (0)