Skip to content

Commit c11b924

Browse files
authored
feat: batch publish (#480)
* feat: batch publish * chore: add tests * refactor: batchPublish option instead of batch
1 parent eb53ea9 commit c11b924

File tree

5 files changed

+130
-64
lines changed

5 files changed

+130
-64
lines changed

Diff for: src/index.ts

+44-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { CustomEvent, TypedEventEmitter, StrictSign, StrictNoSign, TopicValidatorResult } from '@libp2p/interface'
22
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
3+
import { encode } from 'it-length-prefixed'
34
import { pipe } from 'it-pipe'
45
import { pushable } from 'it-pushable'
56
import * as constants from './constants.js'
@@ -91,6 +92,8 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
9192
fallbackToFloodsub: boolean
9293
/** if self-published messages should be sent to all peers */
9394
floodPublish: boolean
95+
/** serialize message once and send to all peers without control messages */
96+
batchPublish: boolean
9497
/** whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted nodes. */
9598
doPX: boolean
9699
/** peers with which we will maintain direct connections */
@@ -393,6 +396,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
393396
const opts = {
394397
fallbackToFloodsub: true,
395398
floodPublish: true,
399+
batchPublish: false,
396400
doPX: false,
397401
directPeers: [],
398402
D: constants.GossipsubD,
@@ -2091,14 +2095,20 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
20912095
// If the message is anonymous or has a random author add it to the published message ids cache.
20922096
this.publishedMessageIds.put(msgIdStr)
20932097

2094-
// Send to set of peers aggregated from direct, mesh, fanout
2095-
for (const id of tosend) {
2096-
// sendRpc may mutate RPC message on piggyback, create a new message for each peer
2097-
const sent = this.sendRpc(id, { messages: [rawMsg] })
2098-
2099-
// did not actually send the message
2100-
if (!sent) {
2101-
tosend.delete(id)
2098+
const batchPublish = opts?.batchPublish ?? this.opts.batchPublish
2099+
const rpc = { messages: [rawMsg] }
2100+
if (batchPublish) {
2101+
this.sendRpcInBatch(tosend, rpc)
2102+
} else {
2103+
// Send to set of peers aggregated from direct, mesh, fanout
2104+
for (const id of tosend) {
2105+
// sendRpc may mutate RPC message on piggyback, create a new message for each peer
2106+
const sent = this.sendRpc(id, rpc)
2107+
2108+
// did not actually send the message
2109+
if (!sent) {
2110+
tosend.delete(id)
2111+
}
21022112
}
21032113
}
21042114

@@ -2133,6 +2143,32 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
21332143
}
21342144
}
21352145

2146+
/**
2147+
* Send the same data in batch to tosend list without considering cached control messages
2148+
* This is not only faster but also avoid allocating memory for each peer
2149+
* see https://github.com/ChainSafe/js-libp2p-gossipsub/issues/344
2150+
*/
2151+
private sendRpcInBatch (tosend: Set<PeerIdStr>, rpc: IRPC): void {
2152+
const rpcBytes = RPC.encode(rpc).finish()
2153+
const prefixedData = encode.single(rpcBytes)
2154+
for (const id of tosend) {
2155+
const outboundStream = this.streamsOutbound.get(id)
2156+
if (outboundStream == null) {
2157+
this.log(`Cannot send RPC to ${id} as there is no open stream to it available`)
2158+
tosend.delete(id)
2159+
continue
2160+
}
2161+
try {
2162+
outboundStream.pushPrefixed(prefixedData)
2163+
} catch (e) {
2164+
tosend.delete(id)
2165+
this.log.error(`Cannot send rpc to ${id}`, e)
2166+
}
2167+
2168+
this.metrics?.onRpcSent(rpc, rpcBytes.length)
2169+
}
2170+
}
2171+
21362172
/**
21372173
* This function should be called when `asyncValidation` is `true` after
21382174
* the message got validated by the caller. Messages are stored in the `mcache` and

Diff for: src/stream.ts

+17
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ interface InboundStreamOpts {
1717

1818
export class OutboundStream {
1919
private readonly pushable: Pushable<Uint8Array>
20+
private readonly lpPushable: Pushable<Uint8ArrayList>
2021
private readonly closeController: AbortController
2122
private readonly maxBufferSize: number
2223

2324
constructor (private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) {
2425
this.pushable = pushable({ objectMode: false })
26+
this.lpPushable = pushable({ objectMode: false })
2527
this.closeController = new AbortController()
2628
this.maxBufferSize = opts.maxBufferSize ?? Infinity
2729

@@ -30,6 +32,10 @@ export class OutboundStream {
3032
(source) => encode(source),
3133
this.rawStream
3234
).catch(errCallback)
35+
36+
pipe(abortableSource(this.lpPushable, this.closeController.signal, { returnOnAbort: true }), this.rawStream).catch(
37+
errCallback
38+
)
3339
}
3440

3541
get protocol (): string {
@@ -46,10 +52,21 @@ export class OutboundStream {
4652
this.pushable.push(data)
4753
}
4854

55+
/**
56+
* Same to push() but this is prefixed data so no need to encode length prefixed again
57+
*/
58+
pushPrefixed (data: Uint8ArrayList): void {
59+
if (this.lpPushable.readableLength > this.maxBufferSize) {
60+
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
61+
}
62+
this.lpPushable.push(data)
63+
}
64+
4965
async close (): Promise<void> {
5066
this.closeController.abort()
5167
// similar to pushable.end() but clear the internal buffer
5268
await this.pushable.return()
69+
await this.lpPushable.return()
5370
await this.rawStream.close()
5471
}
5572
}

Diff for: src/types.ts

+2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ export enum SignaturePolicy {
7373
export interface PublishOpts {
7474
allowPublishToZeroPeers?: boolean
7575
ignoreDuplicatePublishError?: boolean
76+
/** serialize message once and send to all peers without control messages */
77+
batchPublish?: boolean
7678
}
7779

7880
export enum PublishConfigType {

Diff for: test/e2e/go-gossipsub.spec.ts

+38-32
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
8484
number: 20,
8585
init: {
8686
floodPublish: false,
87+
batchPublish: true,
8788
scoreParams: {
8889
IPColocationFactorThreshold: 20,
8990
behaviourPenaltyWeight: 0
@@ -112,42 +113,47 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
112113
await Promise.all(sendRecv)
113114
})
114115

115-
it('test dense gossipsub', async function () {
116-
// Create 20 gossipsub nodes
117-
// Subscribe to the topic, all nodes
118-
// Densely connect the nodes
119-
// Publish 100 messages, each from a random node
120-
// Assert that subscribed nodes receive the message
121-
psubs = await createComponentsArray({
122-
number: 20,
123-
init: {
124-
floodPublish: false,
125-
scoreParams: {
126-
IPColocationFactorThreshold: 20,
127-
behaviourPenaltyWeight: 0
116+
const batchOpts = [true, false]
117+
for (const batchPublish of batchOpts) {
118+
// eslint-disable-next-line no-loop-func
119+
it(`test dense gossipsub batchPublish=${batchPublish}`, async function () {
120+
// Create 20 gossipsub nodes
121+
// Subscribe to the topic, all nodes
122+
// Densely connect the nodes
123+
// Publish 100 messages, each from a random node
124+
// Assert that subscribed nodes receive the message
125+
psubs = await createComponentsArray({
126+
number: 20,
127+
init: {
128+
floodPublish: false,
129+
batchPublish,
130+
scoreParams: {
131+
IPColocationFactorThreshold: 20,
132+
behaviourPenaltyWeight: 0
133+
}
128134
}
129-
}
130-
})
131-
const topic = 'foobar'
132-
psubs.forEach((ps) => { ps.pubsub.subscribe(topic) })
135+
})
136+
const topic = 'foobar'
137+
psubs.forEach((ps) => { ps.pubsub.subscribe(topic) })
133138

134-
await denseConnect(psubs)
139+
await denseConnect(psubs)
135140

136-
// wait for heartbeats to build mesh
137-
await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))
141+
// wait for heartbeats to build mesh
142+
await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))
138143

139-
const sendRecv = []
140-
for (let i = 0; i < 100; i++) {
141-
const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`)
142-
const owner = Math.floor(Math.random() * psubs.length)
143-
const results = Promise.all(
144-
psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
145-
)
146-
sendRecv.push(psubs[owner].pubsub.publish(topic, msg))
147-
sendRecv.push(results)
148-
}
149-
await Promise.all(sendRecv)
150-
})
144+
const sendRecv = []
145+
for (let i = 0; i < 100; i++) {
146+
const msg = uint8ArrayFromString(`${i} its not a flooooood ${i}`)
147+
const owner = Math.floor(Math.random() * psubs.length)
148+
const results = Promise.all(
149+
psubs.filter((psub, j) => j !== owner).map(checkReceivedMessage(topic, msg, owner, i))
150+
)
151+
sendRecv.push(psubs[owner].pubsub.publish(topic, msg))
152+
sendRecv.push(results)
153+
}
154+
await Promise.all(sendRecv)
155+
})
156+
}
151157

152158
it('test gossipsub fanout', async function () {
153159
// Create 20 gossipsub nodes

Diff for: test/floodsub.spec.ts

+29-24
Original file line numberDiff line numberDiff line change
@@ -180,33 +180,38 @@ describe('gossipsub fallbacks to floodsub', () => {
180180
mockNetwork.reset()
181181
})
182182

183-
it('Publish to a topic - nodeGs', async () => {
184-
const promise = pEvent<'message', CustomEvent<Message>>(nodeFs.pubsub, 'message')
185-
const data = uint8ArrayFromString('hey')
186-
187-
await nodeGs.pubsub.publish(topic, data)
188-
189-
const evt = await promise
190-
if (evt.detail.type !== 'signed') {
191-
throw new Error('unexpected message type')
192-
}
193-
expect(evt.detail.data).to.equalBytes(data)
194-
expect(evt.detail.from.toString()).to.be.eql(nodeGs.components.peerId.toString())
195-
})
183+
const batchPublishOpts = [true, false]
184+
for (const batchPublish of batchPublishOpts) {
185+
// eslint-disable-next-line no-loop-func
186+
it(`Publish to a topic - nodeGs - batchPublish: ${batchPublish}`, async () => {
187+
const promise = pEvent<'message', CustomEvent<Message>>(nodeFs.pubsub, 'message')
188+
const data = uint8ArrayFromString('hey')
189+
190+
await nodeGs.pubsub.publish(topic, data, { batchPublish })
191+
192+
const evt = await promise
193+
if (evt.detail.type !== 'signed') {
194+
throw new Error('unexpected message type')
195+
}
196+
expect(evt.detail.data).to.equalBytes(data)
197+
expect(evt.detail.from.toString()).to.be.eql(nodeGs.components.peerId.toString())
198+
})
196199

197-
it('Publish to a topic - nodeFs', async () => {
198-
const promise = pEvent<'message', CustomEvent<Message>>(nodeGs.pubsub, 'message')
199-
const data = uint8ArrayFromString('banana')
200+
// eslint-disable-next-line no-loop-func
201+
it(`Publish to a topic - nodeFs - batchPublish: ${batchPublish}`, async () => {
202+
const promise = pEvent<'message', CustomEvent<Message>>(nodeGs.pubsub, 'message')
203+
const data = uint8ArrayFromString('banana')
200204

201-
await nodeFs.pubsub.publish(topic, data)
205+
await nodeFs.pubsub.publish(topic, data, { batchPublish })
202206

203-
const evt = await promise
204-
if (evt.detail.type !== 'signed') {
205-
throw new Error('unexpected message type')
206-
}
207-
expect(evt.detail.data).to.equalBytes(data)
208-
expect(evt.detail.from.toString()).to.be.eql(nodeFs.components.peerId.toString())
209-
})
207+
const evt = await promise
208+
if (evt.detail.type !== 'signed') {
209+
throw new Error('unexpected message type')
210+
}
211+
expect(evt.detail.data).to.equalBytes(data)
212+
expect(evt.detail.from.toString()).to.be.eql(nodeFs.components.peerId.toString())
213+
})
214+
}
210215
})
211216

212217
describe('publish after unsubscribe', () => {

0 commit comments

Comments
 (0)