@@ -40,6 +40,71 @@ var import_certifi = __toESM(require("certifi"), 1);
4040var import_grpc_js = __toESM ( require ( "@grpc/grpc-js" ) , 1 ) ;
4141var import_proto_loader = __toESM ( require ( "@grpc/proto-loader" ) , 1 ) ;
4242
43+ // src/utils/schemaCache.js
44+ var SchemaCache = class {
45+ /**
46+ * Map of schemas indexed by ID
47+ * @type {Map<string,Schema> }
48+ */
49+ #schemaChache;
50+ /**
51+ * Map of schemas IDs indexed by topic name
52+ * @type {Map<string,string> }
53+ */
54+ #topicNameCache;
55+ constructor ( ) {
56+ this . #schemaChache = /* @__PURE__ */ new Map ( ) ;
57+ this . #topicNameCache = /* @__PURE__ */ new Map ( ) ;
58+ }
59+ /**
60+ * Retrieves a schema based on its ID
61+ * @param {string } schemaId
62+ * @returns {Schema } schema or undefined if not found
63+ */
64+ getFromId ( schemaId ) {
65+ return this . #schemaChache. get ( schemaId ) ;
66+ }
67+ /**
68+ * Retrieves a schema based on a topic name
69+ * @param {string } topicName
70+ * @returns {Schema } schema or undefined if not found
71+ */
72+ getFromTopicName ( topicName ) {
73+ const schemaId = this . #topicNameCache. get ( topicName ) ;
74+ if ( schemaId ) {
75+ return this . getFromId ( schemaId ) ;
76+ }
77+ return void 0 ;
78+ }
79+ /**
80+ * Caches a schema
81+ * @param {Schema } schema
82+ */
83+ set ( schema ) {
84+ this . #schemaChache. set ( schema . id , schema ) ;
85+ }
86+ /**
87+ * Caches a schema with a topic name
88+ * @param {string } topicName
89+ * @param {Schema } schema
90+ */
91+ setWithTopicName ( topicName , schema ) {
92+ this . #topicNameCache. set ( topicName , schema . id ) ;
93+ this . set ( schema ) ;
94+ }
95+ /**
96+ * Delete a schema based on the topic name
97+ * @param {string } topicName
98+ */
99+ deleteWithTopicName ( topicName ) {
100+ const schemaId = this . #topicNameCache. get ( topicName ) ;
101+ if ( schemaId ) {
102+ this . #schemaChache. delete ( schemaId ) ;
103+ }
104+ this . #topicNameCache. delete ( topicName ) ;
105+ }
106+ } ;
107+
43108// src/utils/eventParseError.js
44109var EventParseError = class extends Error {
45110 /**
@@ -535,8 +600,8 @@ var PubSubApiClient = class {
535600 */
536601 #client;
537602 /**
538- * Map of schemas indexed by topic name
539- * @type {Map<string,Schema> }
603+ * Schema cache
604+ * @type {SchemaCache }
540605 */
541606 #schemaChache;
542607 /**
@@ -551,7 +616,7 @@ var PubSubApiClient = class {
551616 */
552617 constructor ( logger = console ) {
553618 this . #logger = logger ;
554- this . #schemaChache = /* @__PURE__ */ new Map ( ) ;
619+ this . #schemaChache = new SchemaCache ( ) ;
555620 this . #subscriptions = /* @__PURE__ */ new Map ( ) ;
556621 try {
557622 Configuration . load ( ) ;
@@ -759,13 +824,26 @@ var PubSubApiClient = class {
759824 ) ;
760825 data . events . forEach ( async ( event ) => {
761826 try {
762- let schema = await this . #getEventSchema( topicName ) ;
763- if ( schema . id !== event . event . schemaId ) {
764- this . #logger. info (
765- `Event schema changed (${ schema . id } != ${ event . event . schemaId } ), reloading: ${ topicName } `
827+ let schema ;
828+ if ( topicName . endsWith ( "__chn" ) ) {
829+ schema = await this . #getEventSchemaFromId(
830+ event . event . schemaId
831+ ) ;
832+ } else {
833+ schema = await this . #getEventSchemaFromTopicName(
834+ topicName
766835 ) ;
767- this . #schemaChache. delete ( topicName ) ;
768- schema = await this . #getEventSchema( topicName ) ;
836+ if ( schema . id !== event . event . schemaId ) {
837+ this . #logger. info (
838+ `Event schema changed (${ schema . id } != ${ event . event . schemaId } ), reloading: ${ topicName } `
839+ ) ;
840+ this . #schemaChache. deleteWithTopicName (
841+ topicName
842+ ) ;
843+ schema = await this . #getEventSchemaFromTopicName(
844+ topicName
845+ ) ;
846+ }
769847 }
770848 const parsedEvent = parseEvent ( schema , event ) ;
771849 this . #logger. debug ( parsedEvent ) ;
@@ -866,7 +944,7 @@ var PubSubApiClient = class {
866944 if ( ! this . #client) {
867945 throw new Error ( "Pub/Sub API client is not connected." ) ;
868946 }
869- const schema = await this . #getEventSchema ( topicName ) ;
947+ const schema = await this . #getEventSchemaFromTopicName ( topicName ) ;
870948 const id = correlationKey ? correlationKey : import_crypto2 . default . randomUUID ( ) ;
871949 const response = await new Promise ( ( resolve , reject ) => {
872950 this . #client. Publish (
@@ -904,21 +982,23 @@ var PubSubApiClient = class {
904982 * @memberof PubSubApiClient.prototype
905983 */
906984 close ( ) {
907- this . #logger. info ( "closing gRPC stream" ) ;
985+ this . #logger. info ( "Closing gRPC stream" ) ;
908986 this . #client. close ( ) ;
909987 }
910988 /**
911- * Retrieves the event schema for a topic from the cache.
989+ * Retrieves an event schema from the cache based on a topic name .
912990 * If it's not cached, fetches the shema with the gRPC client.
913991 * @param {string } topicName name of the topic that we're fetching
914992 * @returns {Promise<Schema> } Promise holding parsed event schema
915993 */
916- async #getEventSchema ( topicName ) {
917- let schema = this . #schemaChache. get ( topicName ) ;
994+ async #getEventSchemaFromTopicName ( topicName ) {
995+ let schema = this . #schemaChache. getFromTopicName ( topicName ) ;
918996 if ( ! schema ) {
919997 try {
920- schema = await this . #fetchEventSchemaWithClient( topicName ) ;
921- this . #schemaChache. set ( topicName , schema ) ;
998+ schema = await this . #fetchEventSchemaFromTopicNameWithClient(
999+ topicName
1000+ ) ;
1001+ this . #schemaChache. setWithTopicName ( topicName , schema ) ;
9221002 } catch ( error ) {
9231003 throw new Error (
9241004 `Failed to load schema for topic ${ topicName } ` ,
@@ -928,33 +1008,67 @@ var PubSubApiClient = class {
9281008 }
9291009 return schema ;
9301010 }
1011+ /**
1012+ * Retrieves an event schema from the cache based on its ID.
1013+ * If it's not cached, fetches the shema with the gRPC client.
1014+ * @param {string } schemaId ID of the schema that we're fetching
1015+ * @returns {Promise<Schema> } Promise holding parsed event schema
1016+ */
1017+ async #getEventSchemaFromId( schemaId ) {
1018+ let schema = this . #schemaChache. getFromId ( schemaId ) ;
1019+ if ( ! schema ) {
1020+ try {
1021+ schema = await this . #fetchEventSchemaFromIdWithClient( schemaId ) ;
1022+ this . #schemaChache. set ( schema ) ;
1023+ } catch ( error ) {
1024+ throw new Error ( `Failed to load schema with ID ${ schemaId } ` , {
1025+ cause : error
1026+ } ) ;
1027+ }
1028+ }
1029+ return schema ;
1030+ }
9311031 /**
9321032 * Requests the event schema for a topic using the gRPC client
9331033 * @param {string } topicName name of the topic that we're fetching
9341034 * @returns {Promise<Schema> } Promise holding parsed event schema
9351035 */
936- async #fetchEventSchemaWithClient( topicName ) {
1036+ async #fetchEventSchemaFromTopicNameWithClient( topicName ) {
1037+ return new Promise ( ( resolve , reject ) => {
1038+ this . #client. GetTopic (
1039+ { topicName } ,
1040+ async ( topicError , response ) => {
1041+ if ( topicError ) {
1042+ reject ( topicError ) ;
1043+ } else {
1044+ const { schemaId } = response ;
1045+ const schemaInfo = await this . #fetchEventSchemaFromIdWithClient(
1046+ schemaId
1047+ ) ;
1048+ this . #logger. info ( `Topic schema loaded: ${ topicName } ` ) ;
1049+ resolve ( schemaInfo ) ;
1050+ }
1051+ }
1052+ ) ;
1053+ } ) ;
1054+ }
1055+ /**
1056+ * Requests the event schema from an ID using the gRPC client
1057+ * @param {string } schemaId schema ID that we're fetching
1058+ * @returns {Promise<Schema> } Promise holding parsed event schema
1059+ */
1060+ async #fetchEventSchemaFromIdWithClient( schemaId ) {
9371061 return new Promise ( ( resolve , reject ) => {
938- this . #client. GetTopic ( { topicName } , ( topicError , response ) => {
939- if ( topicError ) {
940- reject ( topicError ) ;
1062+ this . #client. GetSchema ( { schemaId } , ( schemaError , res ) => {
1063+ if ( schemaError ) {
1064+ reject ( schemaError ) ;
9411065 } else {
942- const { schemaId } = response ;
943- this . #client. GetSchema ( { schemaId } , ( schemaError , res ) => {
944- if ( schemaError ) {
945- reject ( schemaError ) ;
946- } else {
947- const schemaType = import_avro_js3 . default . parse ( res . schemaJson , {
948- registry : { long : CustomLongAvroType }
949- } ) ;
950- this . #logger. info (
951- `Topic schema loaded: ${ topicName } `
952- ) ;
953- resolve ( {
954- id : schemaId ,
955- type : schemaType
956- } ) ;
957- }
1066+ const schemaType = import_avro_js3 . default . parse ( res . schemaJson , {
1067+ registry : { long : CustomLongAvroType }
1068+ } ) ;
1069+ resolve ( {
1070+ id : schemaId ,
1071+ type : schemaType
9581072 } ) ;
9591073 }
9601074 } ) ;
0 commit comments