Skip to content

Commit 1cb1d6c

Browse files
authored
Merge pull request #128 from kaleido-io/backoff-retry-econn-errors
Retry logic for blockchain calls
2 parents 0b814b7 + 0982c00 commit 1cb1d6c

File tree

5 files changed

+204
-27
lines changed

5 files changed

+204
-27
lines changed

README.md

+29-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ are additional methods used by the token connector to guess at the contract ABI
2929
but is the preferred method for most use cases.
3030

3131
To leverage this capability in a running FireFly environment, you must:
32+
3233
1. [Upload the token contract ABI to FireFly](https://hyperledger.github.io/firefly/tutorials/custom_contracts/ethereum.html)
33-
as a contract interface.
34+
as a contract interface.
3435
2. Include the `interface` parameter when [creating the pool on FireFly](https://hyperledger.github.io/firefly/tutorials/tokens).
3536

3637
This will cause FireFly to parse the interface and provide ABI details
@@ -119,7 +120,7 @@ that specific token. If omitted, the approval covers all tokens.
119120

120121
The following APIs are not part of the fftokens standard, but are exposed under `/api/v1`:
121122

122-
* `GET /receipt/:id` - Get receipt for a previous request
123+
- `GET /receipt/:id` - Get receipt for a previous request
123124

124125
## Running the service
125126

@@ -179,3 +180,29 @@ $ npm run lint
179180
# formatting
180181
$ npm run format
181182
```
183+
184+
## Blockchain retry behaviour
185+
186+
Most short-term outages should be handled by the blockchain connector. For example if the blockchain node returns `HTTP 429` due to rate limiting
187+
it is the blockchain connector's responsibility to use appropriate back-off retries to attempt to make the required blockchain call successfully.
188+
189+
There are cases where the token connector may need to perform its own back-off retry for a blockchain action. For example if the blockchain connector
190+
microservice has crashed and is in the process of restarting just as the token connector is trying to query an NFT token URI to enrich a token event, if
191+
the token connector doesn't perform a retry then the event will be returned without the token URI populated.
192+
193+
The token connector has configurable retry behaviour for all blockchain related calls. By default the connector will perform up to 15 retries with a back-off
194+
interval between each one. The default first retry interval is 100ms and doubles up to a maximum of 10s per retry interval. Retries are only performed where
195+
the error returned from the REST call matches a configurable regular expression retry condition. The default retry condition is `.*ECONN.*` which ensures
196+
retries take place for common TCP errors such as `ECONNRESET` and `ECONNREFUSED`.
197+
198+
The configurable retry settings are:
199+
200+
- `RETRY_BACKOFF_FACTOR` (default `2`)
201+
- `RETRY_BACKOFF_LIMIT_MS` (default `10000`)
202+
- `RETRY_BACKOFF_INITIAL_MS` (default `100`)
203+
- `RETRY_CONDITION` (default `.*ECONN.*`)
204+
- `RETRY_MAX_ATTEMPTS` (default `15`)
205+
206+
Setting `RETRY_CONDITION` to `""` disables retries. Setting `RETRY_MAX_ATTEMPTS` to `-1` causes it to retry indefinitely.
207+
208+
Note, the token connector will make a total of `RETRY_MAX_ATTEMPTS` + 1 calls for a given retryable call (1 original attempt and `RETRY_MAX_ATTEMPTS` retries)

src/main.ts

+11-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import { EventStreamReply } from './event-stream/event-stream.interfaces';
2525
import { EventStreamService } from './event-stream/event-stream.service';
2626
import { requestIDMiddleware } from './request-context/request-id.middleware';
2727
import { RequestLoggingInterceptor } from './request-logging.interceptor';
28-
import { BlockchainConnectorService } from './tokens/blockchain.service';
28+
import { BlockchainConnectorService, RetryConfiguration } from './tokens/blockchain.service';
2929
import {
3030
TokenApprovalEvent,
3131
TokenBurnEvent,
@@ -84,6 +84,15 @@ async function bootstrap() {
8484
const legacyERC20 = config.get<string>('USE_LEGACY_ERC20_SAMPLE', '').toLowerCase() === 'true';
8585
const legacyERC721 = config.get<string>('USE_LEGACY_ERC721_SAMPLE', '').toLowerCase() === 'true';
8686

87+
// Configuration for blockchain call retries
88+
const blockchainRetryCfg: RetryConfiguration = {
89+
retryBackOffFactor: config.get<number>('RETRY_BACKOFF_FACTOR', 2),
90+
retryBackOffLimit: config.get<number>('RETRY_BACKOFF_LIMIT_MS', 10000),
91+
retryBackOffInitial: config.get<number>('RETRY_BACKOFF_INITIAL_MS', 100),
92+
retryCondition: config.get<string>('RETRY_CONDITION', '.*ECONN.*'),
93+
retriesMax: config.get<number>('RETRY_MAX_ATTEMPTS', 15),
94+
};
95+
8796
const passthroughHeaders: string[] = [];
8897
for (const h of passthroughHeaderString.split(',')) {
8998
passthroughHeaders.push(h.toLowerCase());
@@ -93,7 +102,7 @@ async function bootstrap() {
93102
app.get(TokensService).configure(ethConnectUrl, topic, factoryAddress);
94103
app
95104
.get(BlockchainConnectorService)
96-
.configure(ethConnectUrl, fftmUrl, username, password, passthroughHeaders);
105+
.configure(ethConnectUrl, fftmUrl, username, password, passthroughHeaders, blockchainRetryCfg);
97106
app.get(AbiMapperService).configure(legacyERC20, legacyERC721);
98107

99108
if (autoInit.toLowerCase() !== 'false') {

src/tokens/blockchain.service.ts

+91-19
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ import { Context } from '../request-context/request-context.decorator';
3030
import { FFRequestIDHeader } from '../request-context/constants';
3131
import { EthConnectAsyncResponse, EthConnectReturn, IAbiMethod } from './tokens.interfaces';
3232

33+
export interface RetryConfiguration {
34+
retryBackOffFactor: number;
35+
retryBackOffLimit: number;
36+
retryBackOffInitial: number;
37+
retryCondition: string;
38+
retriesMax: number;
39+
}
40+
3341
const sendTransactionHeader = 'SendTransaction';
3442
const queryHeader = 'Query';
3543

@@ -43,6 +51,8 @@ export class BlockchainConnectorService {
4351
password: string;
4452
passthroughHeaders: string[];
4553

54+
retryConfiguration: RetryConfiguration;
55+
4656
constructor(public http: HttpService) {}
4757

4858
configure(
@@ -51,12 +61,14 @@ export class BlockchainConnectorService {
5161
username: string,
5262
password: string,
5363
passthroughHeaders: string[],
64+
retryConfiguration: RetryConfiguration,
5465
) {
5566
this.baseUrl = baseUrl;
5667
this.fftmUrl = fftmUrl;
5768
this.username = username;
5869
this.password = password;
5970
this.passthroughHeaders = passthroughHeaders;
71+
this.retryConfiguration = retryConfiguration;
6072
}
6173

6274
private requestOptions(ctx: Context): AxiosRequestConfig {
@@ -88,15 +100,67 @@ export class BlockchainConnectorService {
88100
});
89101
}
90102

103+
// Check if retry condition matches the err that's been hit
104+
private matchesRetryCondition(err: any): boolean {
105+
return (
106+
this.retryConfiguration.retryCondition != '' &&
107+
`${err}`.match(this.retryConfiguration.retryCondition) !== null
108+
);
109+
}
110+
111+
// Delay by the appropriate amount of time given the iteration the caller is in
112+
private async backoffDelay(iteration: number) {
113+
const delay = Math.min(
114+
this.retryConfiguration.retryBackOffInitial *
115+
Math.pow(this.retryConfiguration.retryBackOffFactor, iteration),
116+
this.retryConfiguration.retryBackOffLimit,
117+
);
118+
await new Promise(resolve => setTimeout(resolve, delay));
119+
}
120+
121+
// Generic helper function that makes a given blockchain function retryable
122+
// by using synchronous back-off delays for cases where the function returns
123+
// an error which matches the configured retry condition
124+
private async retryableCall<T = any>(
125+
blockchainFunction: () => Promise<AxiosResponse<T>>,
126+
): Promise<AxiosResponse<T>> {
127+
let retries = 0;
128+
for (
129+
;
130+
this.retryConfiguration.retriesMax == -1 || retries <= this.retryConfiguration.retriesMax;
131+
this.retryConfiguration.retriesMax == -1 || retries++ // Don't inc 'retries' if 'retriesMax' if set to -1 (infinite retries)
132+
) {
133+
try {
134+
return await blockchainFunction();
135+
} catch (e) {
136+
if (this.matchesRetryCondition(e)) {
137+
this.logger.debug(`Retry condition matched for error ${e}`);
138+
// Wait for a backed-off delay before trying again
139+
await this.backoffDelay(retries);
140+
} else {
141+
// Whatever the error was it's not one we will retry for
142+
throw e;
143+
}
144+
}
145+
}
146+
147+
throw new InternalServerErrorException(
148+
`Call to blockchain connector failed after ${retries} attempts`,
149+
);
150+
}
151+
91152
async query(ctx: Context, to: string, method?: IAbiMethod, params?: any[]) {
153+
const url = this.baseUrl;
92154
const response = await this.wrapError(
93-
lastValueFrom(
94-
this.http.post<EthConnectReturn>(
95-
this.baseUrl,
96-
{ headers: { type: queryHeader }, to, method, params },
97-
this.requestOptions(ctx),
98-
),
99-
),
155+
this.retryableCall<EthConnectReturn>(async (): Promise<AxiosResponse<EthConnectReturn>> => {
156+
return lastValueFrom(
157+
this.http.post(
158+
url,
159+
{ headers: { type: queryHeader }, to, method, params },
160+
this.requestOptions(ctx),
161+
),
162+
);
163+
}),
100164
);
101165
return response.data;
102166
}
@@ -110,26 +174,34 @@ export class BlockchainConnectorService {
110174
params?: any[],
111175
) {
112176
const url = this.fftmUrl !== undefined && this.fftmUrl !== '' ? this.fftmUrl : this.baseUrl;
177+
113178
const response = await this.wrapError(
114-
lastValueFrom(
115-
this.http.post<EthConnectAsyncResponse>(
116-
url,
117-
{ headers: { id, type: sendTransactionHeader }, from, to, method, params },
118-
this.requestOptions(ctx),
119-
),
179+
this.retryableCall<EthConnectAsyncResponse>(
180+
async (): Promise<AxiosResponse<EthConnectAsyncResponse>> => {
181+
return lastValueFrom(
182+
this.http.post(
183+
url,
184+
{ headers: { id, type: sendTransactionHeader }, from, to, method, params },
185+
this.requestOptions(ctx),
186+
),
187+
);
188+
},
120189
),
121190
);
122191
return response.data;
123192
}
124193

125194
async getReceipt(ctx: Context, id: string): Promise<EventStreamReply> {
195+
const url = this.baseUrl;
126196
const response = await this.wrapError(
127-
lastValueFrom(
128-
this.http.get<EventStreamReply>(new URL(`/reply/${id}`, this.baseUrl).href, {
129-
validateStatus: status => status < 300 || status === 404,
130-
...this.requestOptions(ctx),
131-
}),
132-
),
197+
this.retryableCall<EventStreamReply>(async (): Promise<AxiosResponse<EventStreamReply>> => {
198+
return lastValueFrom(
199+
this.http.get(new URL(`/reply/${id}`, url).href, {
200+
validateStatus: status => status < 300 || status === 404,
201+
...this.requestOptions(ctx),
202+
}),
203+
);
204+
}),
133205
);
134206
if (response.status === 404) {
135207
throw new NotFoundException();

src/tokens/tokens.service.spec.ts

+61-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
import { EventStreamService } from '../event-stream/event-stream.service';
3434
import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway';
3535
import { AbiMapperService } from './abimapper.service';
36-
import { BlockchainConnectorService } from './blockchain.service';
36+
import { BlockchainConnectorService, RetryConfiguration } from './blockchain.service';
3737
import {
3838
AsyncResponse,
3939
EthConnectAsyncResponse,
@@ -196,6 +196,14 @@ describe('TokensService', () => {
196196
);
197197
};
198198

199+
const mockECONNErrors = (count: number) => {
200+
for (let i = 0; i < count; i++) {
201+
http.post.mockImplementationOnce(() => {
202+
throw new Error('connect ECONNREFUSED 10.1.2.3');
203+
});
204+
}
205+
};
206+
199207
beforeEach(async () => {
200208
http = {
201209
get: jest.fn(),
@@ -232,10 +240,18 @@ describe('TokensService', () => {
232240
.useValue(eventstream)
233241
.compile();
234242

243+
let blockchainRetryCfg: RetryConfiguration = {
244+
retryBackOffFactor: 2,
245+
retryBackOffLimit: 500,
246+
retryBackOffInitial: 50,
247+
retryCondition: '.*ECONN.*',
248+
retriesMax: 15,
249+
};
250+
235251
service = module.get(TokensService);
236252
service.configure(BASE_URL, TOPIC, '');
237253
blockchain = module.get(BlockchainConnectorService);
238-
blockchain.configure(BASE_URL, '', '', '', []);
254+
blockchain.configure(BASE_URL, '', '', '', [], blockchainRetryCfg);
239255
});
240256

241257
it('should be defined', () => {
@@ -1042,6 +1058,49 @@ describe('TokensService', () => {
10421058
expect(http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, { headers });
10431059
});
10441060

1061+
it('should mint ERC721WithData token with correct abi, custom uri, and inputs after 6 ECONNREFUSED retries', async () => {
1062+
const ctx = newContext();
1063+
const headers = {
1064+
'x-firefly-request-id': ctx.requestId,
1065+
};
1066+
1067+
const request: TokenMint = {
1068+
tokenIndex: '721',
1069+
signer: IDENTITY,
1070+
poolLocator: ERC721_WITH_DATA_V1_POOL_ID,
1071+
to: '0x123',
1072+
uri: 'ipfs://CID',
1073+
};
1074+
1075+
const response: EthConnectAsyncResponse = {
1076+
id: 'responseId',
1077+
sent: true,
1078+
};
1079+
1080+
const mockEthConnectRequest: EthConnectMsgRequest = {
1081+
headers: {
1082+
type: 'SendTransaction',
1083+
},
1084+
from: IDENTITY,
1085+
to: CONTRACT_ADDRESS,
1086+
method: ERC721WithDataV1ABI.abi.find(abi => abi.name === MINT_WITH_URI) as IAbiMethod,
1087+
params: ['0x123', '721', '0x00', 'ipfs://CID'],
1088+
};
1089+
1090+
http.post.mockReturnValueOnce(
1091+
new FakeObservable(<EthConnectReturn>{
1092+
output: true,
1093+
}),
1094+
);
1095+
mockECONNErrors(6);
1096+
http.post.mockReturnValueOnce(new FakeObservable(response));
1097+
1098+
await service.mint(ctx, request);
1099+
1100+
expect(http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, { headers });
1101+
expect(http.post).toHaveBeenCalledTimes(8); // Expect initial submit OK, 6 ECONN errors, final call OK = 8 POSTs
1102+
});
1103+
10451104
it('should mint ERC721WithData token with correct abi, custom uri, auto-indexing, and inputs', async () => {
10461105
const ctx = newContext();
10471106
const headers = {

test/app.e2e-context.ts

+12-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { EventStreamService } from '../src/event-stream/event-stream.service';
1212
import { EventStreamProxyGateway } from '../src/eventstream-proxy/eventstream-proxy.gateway';
1313
import { TokensService } from '../src/tokens/tokens.service';
1414
import { requestIDMiddleware } from '../src/request-context/request-id.middleware';
15-
import { BlockchainConnectorService } from '../src/tokens/blockchain.service';
15+
import { BlockchainConnectorService, RetryConfiguration } from '../src/tokens/blockchain.service';
1616

1717
export const BASE_URL = 'http://eth';
1818
export const INSTANCE_PATH = '/tokens';
@@ -69,9 +69,19 @@ export class TestContext {
6969
this.app.use(requestIDMiddleware);
7070
await this.app.init();
7171

72+
let blockchainRetryCfg: RetryConfiguration = {
73+
retryBackOffFactor: 2,
74+
retryBackOffLimit: 500,
75+
retryBackOffInitial: 50,
76+
retryCondition: '.*ECONN.*',
77+
retriesMax: 15,
78+
};
79+
7280
this.app.get(EventStreamProxyGateway).configure('url', TOPIC);
7381
this.app.get(TokensService).configure(BASE_URL, TOPIC, '');
74-
this.app.get(BlockchainConnectorService).configure(BASE_URL, '', '', '', []);
82+
this.app
83+
.get(BlockchainConnectorService)
84+
.configure(BASE_URL, '', '', '', [], blockchainRetryCfg);
7585

7686
(this.app.getHttpServer() as Server).listen();
7787
this.server = request(this.app.getHttpServer());

0 commit comments

Comments
 (0)