1
1
import { Injectable , Logger } from '@nestjs/common' ;
2
- import { Job , JobsOptions , Queue } from 'bullmq' ;
2
+ import { Job , Queue } from 'bullmq' ;
3
3
import { InjectQueue , Processor } from '@nestjs/bullmq' ;
4
4
import { hexToString } from '@polkadot/util' ;
5
5
import parquet from '@dsnp/parquetjs' ;
6
- import { bases } from 'multiformats/basics' ;
7
6
import { AppConfigService } from '../config/config.service' ;
8
7
import { calculateJobId } from '..' ;
9
8
import * as QueueConstants from '../queues/queue-constants' ;
10
9
import { IIPFSJob } from '../interfaces/ipfs.job.interface' ;
11
10
import { BaseConsumer } from '../utils/base-consumer' ;
12
11
import { IpfsService } from '../utils/ipfs.client' ;
13
- import {
14
- AnnouncementResponse ,
15
- AnnouncementType ,
16
- BroadcastAnnouncement ,
17
- ProfileAnnouncement ,
18
- ReactionAnnouncement ,
19
- ReplyAnnouncement ,
20
- TombstoneAnnouncement ,
21
- UpdateAnnouncement ,
22
- } from '../types/content-announcement' ;
23
- import { isBroadcast , isProfile , isReaction , isReply , isTombstone , isTypedAnnouncement , isUpdate } from '../utils/type-guards' ;
12
+ import { AnnouncementResponse } from '../types/content-announcement' ;
13
+ import { isBroadcast , isProfile , isReaction , isReply , isTombstone , isUpdate } from '../utils/type-guards' ;
24
14
25
15
@Injectable ( )
26
16
@Processor ( QueueConstants . IPFS_QUEUE , {
@@ -76,7 +66,11 @@ export class IPFSContentProcessor extends BaseConsumer {
76
66
}
77
67
}
78
68
79
- private async enqueueAnnouncementResponse ( announcementResponse : AnnouncementResponse , name : string , queue : Queue ) : Promise < void > {
69
+ private async enqueueAnnouncementResponse (
70
+ announcementResponse : AnnouncementResponse ,
71
+ name : string ,
72
+ queue : Queue ,
73
+ ) : Promise < void > {
80
74
if ( ! ( await this . isQueueFull ( queue ) ) ) {
81
75
const jobId = calculateJobId ( announcementResponse ) ;
82
76
await queue . add ( name , announcementResponse , { jobId } ) ;
@@ -98,7 +92,9 @@ export class IPFSContentProcessor extends BaseConsumer {
98
92
if ( isBroadcast ( mapRecord ) ) {
99
93
announcementResponse . announcement = {
100
94
fromId : mapRecord . fromId ,
101
- contentHash : mapRecord . contentHash ,
95
+ contentHash : Buffer . isBuffer ( mapRecord . contentHash )
96
+ ? mapRecord . contentHash . toString ( )
97
+ : mapRecord . contentHash ,
102
98
url : mapRecord . url ,
103
99
announcementType : mapRecord . announcementType ,
104
100
} ;
@@ -108,7 +104,9 @@ export class IPFSContentProcessor extends BaseConsumer {
108
104
announcementResponse . announcement = {
109
105
fromId : mapRecord . fromId ,
110
106
targetAnnouncementType : mapRecord . targetAnnouncementType ,
111
- targetContentHash : mapRecord . targetContentHash ,
107
+ targetContentHash : Buffer . isBuffer ( mapRecord . targetContentHash )
108
+ ? mapRecord . targetContentHash . toString ( )
109
+ : mapRecord . targetContentHash ,
112
110
announcementType : mapRecord . announcementType ,
113
111
} ;
114
112
queue = this . tombstoneQueue ;
@@ -129,7 +127,9 @@ export class IPFSContentProcessor extends BaseConsumer {
129
127
announcementType : mapRecord . announcementType ,
130
128
url : mapRecord . url ,
131
129
inReplyTo : mapRecord . inReplyTo ,
132
- contentHash : mapRecord . contentHash ,
130
+ contentHash : Buffer . isBuffer ( mapRecord . contentHash )
131
+ ? mapRecord . contentHash . toString ( )
132
+ : mapRecord . contentHash ,
133
133
} ;
134
134
queue = this . replyQueue ;
135
135
typeName = 'Reply' ;
@@ -138,7 +138,9 @@ export class IPFSContentProcessor extends BaseConsumer {
138
138
fromId : mapRecord . fromId ,
139
139
announcementType : mapRecord . announcementType ,
140
140
url : mapRecord . url ,
141
- contentHash : mapRecord . contentHash ,
141
+ contentHash : Buffer . isBuffer ( mapRecord . contentHash )
142
+ ? mapRecord . contentHash . toString ( )
143
+ : mapRecord . contentHash ,
142
144
} ;
143
145
queue = this . profileQueue ;
144
146
typeName = 'Profile' ;
@@ -147,9 +149,13 @@ export class IPFSContentProcessor extends BaseConsumer {
147
149
fromId : mapRecord . fromId ,
148
150
announcementType : mapRecord . announcementType ,
149
151
url : mapRecord . url ,
150
- contentHash : mapRecord . contentHash ,
152
+ contentHash : Buffer . isBuffer ( mapRecord . contentHash )
153
+ ? mapRecord . contentHash . toString ( )
154
+ : mapRecord . contentHash ,
151
155
targetAnnouncementType : mapRecord . targetAnnouncementType ,
152
- targetContentHash : mapRecord . targetContentHash ,
156
+ targetContentHash : Buffer . isBuffer ( mapRecord . targetContentHash )
157
+ ? mapRecord . targetContentHash . toString ( )
158
+ : mapRecord . targetContentHash ,
153
159
} ;
154
160
queue = this . updateQueue ;
155
161
typeName = 'Update' ;
0 commit comments