Skip to content

Commit b0d006f

Browse files
authored
fix: address race condition between asset publishing and batch announcement (#728)
# Description This PR does the following: - overrides the retry limit for pre-existing batchAnnouncement jobs from 1 to 3 - traps failure to retrieve IPFS info of a pinned resource in the batch queue processor and throws an error (prevents erroneous queueing of batch jobs that end up looking like onChain content jobs) - adds an initial delay of 3s to batchAnnouncement jobs to allow time for asset upload to ipfs
1 parent 79aea4d commit b0d006f

File tree

3 files changed

+14
-3
lines changed

3 files changed

+14
-3
lines changed

apps/content-publishing-api/src/api.service.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ export class ApiService {
9797
data.id = this.calculateJobId(data);
9898
const job = await this.batchAnnouncerQueue.add(`Batch Request Job - ${data.id}`, data, {
9999
jobId: data.id,
100+
attempts: 3,
101+
delay: 3000,
100102
removeOnFail: false,
101103
removeOnComplete: 2000,
102104
}); // TODO: should come from queue configs

apps/content-publishing-worker/src/batch_announcer/batch.announcer.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,14 @@ export class BatchAnnouncer {
7171

7272
public async announceExistingBatch(batch: IBatchFile): Promise<IPublisherJob> {
7373
// Get previously uploaded file from IPFS
74-
const { Key: cid, Size: size } = await this.ipfsService.getInfo(batch.cid);
74+
this.logger.log(`Getting info from IPFS for ${batch.cid}`);
75+
const { Key: cid, Size: size, Message: msg, Type: msgType } = await this.ipfsService.getInfo(batch.cid);
76+
if (msgType === 'error' || !cid) {
77+
this.logger.error(`Unable to confirm batch file existence in IPFS: ${msg}`);
78+
throw new Error(`Unable to confirm batch file existence in IPFS: ${msg}`);
79+
}
80+
81+
this.logger.debug(`Got info from IPFS: cid=${cid}, size=${size}`);
7582

7683
const ipfsUrl = await this.formIpfsUrl(cid);
7784
const response = { id: batch.cid, schemaId: batch.schemaId, data: { cid, payloadLength: size } };

libs/storage/src/ipfs/ipfs.service.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export class IpfsService {
6161

6262
public async getInfo(cid: string, checkExistence = true): Promise<IpfsBlockStatResponse> {
6363
if (checkExistence && !(await this.isPinned(cid))) {
64-
return Promise.resolve({ Message: 'Requested resource does not exist', Type: 'error' });
64+
return { Message: 'Requested resource does not exist', Type: 'error' };
6565
}
6666

6767
const ipfsGet = `${this.config.ipfsEndpoint}/api/v0/block/stat?arg=${cid}`;
@@ -74,6 +74,7 @@ export class IpfsService {
7474

7575
const headers = { Accept: '*/*', Connection: 'keep-alive', authorization: ipfsAuth };
7676

77+
this.logger.debug(`Requesting IPFS stats from ${ipfsGet}`);
7778
const response = await axios.post(ipfsGet, null, { headers, responseType: 'json' });
7879
this.logger.debug(`IPFS response: ${JSON.stringify(response.data)}`);
7980
return response.data as IpfsBlockStatResponse;
@@ -96,8 +97,9 @@ export class IpfsService {
9697
authorization: ipfsAuth,
9798
};
9899

100+
this.logger.debug(`Requesting pin info from IPFS for ${ipfsGet}`);
99101
const response = await axios.post(ipfsGet, null, { headers, responseType: 'json' }).catch((error) => {
100-
// when pid does not exist this call returns 500 which is not great
102+
// when pin does not exist this call returns 500 which is not great
101103
if (error.response && error.response.status !== 500) {
102104
this.logger.error(error.toJSON());
103105
}

0 commit comments

Comments
 (0)