Skip to content

Commit 5d9269a

Browse files
authored
Merge pull request #134 from kaleido-io/async-event-enrichment
Async event enrichment
2 parents a989283 + 6491552 commit 5d9269a

6 files changed

Lines changed: 31 additions & 28 deletions

File tree

src/eventstream-proxy/eventstream-proxy.base.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
127127

128128
private async processEvents(batch: EventBatch) {
129129
const messages: WebSocketMessage[] = [];
130+
const eventHandlers: Promise<WebSocketMessage | undefined>[] = [];
130131
for (const event of batch.events) {
131132
this.logger.log(`Proxying event: ${JSON.stringify(event)}`);
132133
const subName = await this.getSubscriptionName(newContext(), event.subId);
@@ -136,15 +137,22 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
136137
}
137138

138139
for (const listener of this.eventListeners) {
139-
try {
140-
await listener.onEvent(subName, event, (msg: WebSocketMessage | undefined) => {
141-
if (msg !== undefined) {
142-
messages.push(msg);
143-
}
144-
});
145-
} catch (err) {
146-
this.logger.error(`Error processing event: ${err}`);
140+
// Some events require enrichment that could involve a call to the blockchain,
141+
// so we don't want to do those synchronously. Create a promise for each onEvent()
142+
// handler and when they're all complete we'll create the batch message
143+
eventHandlers.push(Promise.resolve(listener.onEvent(subName, event)));
144+
}
145+
}
146+
147+
// Now we need to await the promises in order so the messages stay in order
148+
for (const nextProm of eventHandlers) {
149+
try {
150+
const msg = await nextProm;
151+
if (msg !== undefined) {
152+
messages.push(msg);
147153
}
154+
} catch (err) {
155+
this.logger.error(`Error processing event: ${err}`);
148156
}
149157
}
150158
const message: WebSocketMessageWithId = {

src/eventstream-proxy/eventstream-proxy.interfaces.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export interface ConnectionListener {
2626
}
2727

2828
export interface EventListener {
29-
onEvent: (subName: string, event: Event, process: EventProcessor) => void | Promise<void>;
29+
onEvent: (subName: string, event: Event) => undefined | Promise<WebSocketMessage | undefined>;
3030
}
3131

3232
export interface WebSocketMessageWithId extends WebSocketMessage {

src/tokens/blockchain.service.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2022 Kaleido, Inc.
1+
// Copyright © 2023 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -103,7 +103,7 @@ export class BlockchainConnectorService {
103103
// Check if retry condition matches the err that's been hit
104104
private matchesRetryCondition(err: any): boolean {
105105
return (
106-
this.retryConfiguration.retryCondition != '' &&
106+
this.retryConfiguration.retryCondition !== '' &&
107107
`${err}`.match(this.retryConfiguration.retryCondition) !== null
108108
);
109109
}
@@ -127,8 +127,8 @@ export class BlockchainConnectorService {
127127
let retries = 0;
128128
for (
129129
;
130-
this.retryConfiguration.retriesMax == -1 || retries <= this.retryConfiguration.retriesMax;
131-
this.retryConfiguration.retriesMax == -1 || retries++ // Don't inc 'retries' if 'retriesMax' if set to -1 (infinite retries)
130+
this.retryConfiguration.retriesMax === -1 || retries <= this.retryConfiguration.retriesMax;
131+
this.retryConfiguration.retriesMax === -1 || retries++ // Don't inc 'retries' if 'retriesMax' if set to -1 (infinite retries)
132132
) {
133133
try {
134134
return await blockchainFunction();

src/tokens/tokens.listener.ts

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2022 Kaleido, Inc.
1+
// Copyright © 2023 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -16,7 +16,7 @@
1616

1717
import { Logger } from '@nestjs/common';
1818
import { Event } from '../event-stream/event-stream.interfaces';
19-
import { EventListener, EventProcessor } from '../eventstream-proxy/eventstream-proxy.interfaces';
19+
import { EventListener } from '../eventstream-proxy/eventstream-proxy.interfaces';
2020
import { WebSocketMessage } from '../websocket-events/websocket-events.base';
2121
import { Context, newContext } from '../request-context/request-context.decorator';
2222
import {
@@ -41,7 +41,6 @@ import {
4141
unpackSubscriptionName,
4242
validatePoolLocator,
4343
} from './tokens.util';
44-
import { TokensService } from './tokens.service';
4544
import { AbiMapperService } from './abimapper.service';
4645
import { BlockchainConnectorService } from './blockchain.service';
4746
import { TokenURI as ERC721URI } from './erc721';
@@ -58,21 +57,17 @@ export class TokenListener implements EventListener {
5857

5958
constructor(private mapper: AbiMapperService, private blockchain: BlockchainConnectorService) {}
6059

61-
async onEvent(subName: string, event: Event, process: EventProcessor) {
60+
async onEvent(subName: string, event: Event) {
6261
const signature = this.trimEventSignature(event.signature);
6362
switch (signature) {
6463
case tokenCreateEventSignature:
65-
process(await this.transformTokenPoolCreationEvent(subName, event));
66-
break;
64+
return this.transformTokenPoolCreationEvent(subName, event);
6765
case transferEventSignature:
68-
process(await this.transformTransferEvent(subName, event));
69-
break;
66+
return this.transformTransferEvent(subName, event);
7067
case approvalEventSignature:
71-
process(this.transformApprovalEvent(subName, event));
72-
break;
68+
return this.transformApprovalEvent(subName, event);
7369
case approvalForAllEventSignature:
74-
process(this.transformApprovalForAllEvent(subName, event));
75-
break;
70+
return this.transformApprovalForAllEvent(subName, event);
7671
default:
7772
this.logger.error(`Unknown event signature: ${event.signature}`);
7873
}

src/tokens/tokens.service.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2022 Kaleido, Inc.
1+
// Copyright © 2023 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -240,7 +240,7 @@ describe('TokensService', () => {
240240
.useValue(eventstream)
241241
.compile();
242242

243-
let blockchainRetryCfg: RetryConfiguration = {
243+
const blockchainRetryCfg: RetryConfiguration = {
244244
retryBackOffFactor: 2,
245245
retryBackOffLimit: 500,
246246
retryBackOffInitial: 50,

test/app.e2e-context.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export class TestContext {
6969
this.app.use(requestIDMiddleware);
7070
await this.app.init();
7171

72-
let blockchainRetryCfg: RetryConfiguration = {
72+
const blockchainRetryCfg: RetryConfiguration = {
7373
retryBackOffFactor: 2,
7474
retryBackOffLimit: 500,
7575
retryBackOffInitial: 50,

0 commit comments

Comments
 (0)