Skip to content

Commit 0ced0a3

Browse files
committed
feat: request L2Block txs through fast req/resp
fix tests fix tests fix build
1 parent 8ea8713 commit 0ced0a3

File tree

10 files changed

+219
-62
lines changed

10 files changed

+219
-62
lines changed

yarn-project/p2p/src/config.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ describe('config', () => {
2525
expect(allowList).toEqual(config);
2626
});
2727

28-
it('defaults proposal tx collector type to new', () => {
28+
it('defaults missing txs collector type to new', () => {
2929
const config = getP2PDefaultConfig();
30-
expect(config.txCollectionProposalTxCollectorType).toBe('new');
30+
expect(config.txCollectionMissingTxsCollectorType).toBe('new');
3131
});
3232
});

yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import { type Logger, createLogger } from '@aztec/foundation/log';
44
import { FifoMemoryQueue, type ISemaphore, Semaphore } from '@aztec/foundation/queue';
55
import { sleep } from '@aztec/foundation/sleep';
66
import { DateProvider, executeTimeout } from '@aztec/foundation/timer';
7-
import { type BlockProposal, PeerErrorSeverity } from '@aztec/stdlib/p2p';
7+
import { PeerErrorSeverity } from '@aztec/stdlib/p2p';
88
import { Tx, TxArray, TxHash } from '@aztec/stdlib/tx';
99

1010
import type { PeerId } from '@libp2p/interface';
1111
import { peerIdFromString } from '@libp2p/peer-id';
1212

1313
import { ReqRespSubProtocol } from '.././interface.js';
14-
import { BlockTxsRequest, BlockTxsResponse } from '.././protocols/index.js';
14+
import { BlockTxsRequest, BlockTxsResponse, type BlockTxsSource } from '.././protocols/index.js';
1515
import { ReqRespStatus } from '.././status.js';
1616
import {
1717
DEFAULT_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD,
@@ -42,7 +42,7 @@ import { BatchRequestTxValidator, type IBatchRequestTxValidator } from './tx_val
4242
* - Is the peer which was unable to send us successful response N times in a row
4343
* */
4444
export class BatchTxRequester {
45-
private readonly blockProposal: BlockProposal;
45+
private readonly blockTxsSource: BlockTxsSource;
4646
private readonly pinnedPeer: PeerId | undefined;
4747
private readonly timeoutMs: number;
4848
private readonly p2pService: BatchTxRequesterLibP2PService;
@@ -61,15 +61,15 @@ export class BatchTxRequester {
6161

6262
constructor(
6363
missingTxs: TxHash[],
64-
blockProposal: BlockProposal,
64+
blockTxsSource: BlockTxsSource,
6565
pinnedPeer: PeerId | undefined,
6666
timeoutMs: number,
6767
p2pService: BatchTxRequesterLibP2PService,
6868
logger?: Logger,
6969
dateProvider?: DateProvider,
7070
opts?: BatchTxRequesterOptions,
7171
) {
72-
this.blockProposal = blockProposal;
72+
this.blockTxsSource = blockTxsSource;
7373
this.pinnedPeer = pinnedPeer;
7474
this.timeoutMs = timeoutMs;
7575
this.p2pService = p2pService;
@@ -205,7 +205,7 @@ export class BatchTxRequester {
205205
return;
206206
}
207207

208-
const request = BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txs);
208+
const request = BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockTxsSource, txs);
209209
if (!request) {
210210
return;
211211
}
@@ -250,7 +250,7 @@ export class BatchTxRequester {
250250
// there is solid chance that peer didn't receive proposal yet, thus we must send full hashes
251251
const includeFullHashesInRequestNotJustIndices = true;
252252
const blockRequest = BlockTxsRequest.fromBlockProposalAndMissingTxs(
253-
this.blockProposal,
253+
this.blockTxsSource,
254254
txs,
255255
includeFullHashesInRequestNotJustIndices,
256256
);
@@ -342,7 +342,7 @@ export class BatchTxRequester {
342342

343343
const makeRequest = (pid: PeerId) => {
344344
const txs = this.txsMetadata.getTxsToRequestFromThePeer(pid);
345-
const blockRequest = BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockProposal, txs);
345+
const blockRequest = BlockTxsRequest.fromBlockProposalAndMissingTxs(this.blockTxsSource, txs);
346346
if (!blockRequest) {
347347
return undefined;
348348
}
@@ -605,7 +605,7 @@ export class BatchTxRequester {
605605
}
606606

607607
private isBlockResponseValid(response: BlockTxsResponse): boolean {
608-
const archiveRootsMatch = this.blockProposal.archive.toString() === response.archiveRoot.toString();
608+
const archiveRootsMatch = this.blockTxsSource.archive.toString() === response.archiveRoot.toString();
609609
const peerHasSomeTxsFromProposal = !response.txIndices.isEmpty();
610610
return archiveRootsMatch && peerHasSomeTxsFromProposal;
611611
}
@@ -624,7 +624,7 @@ export class BatchTxRequester {
624624
private extractHashesPeerHasFromResponse(response: BlockTxsResponse): Array<TxHash> {
625625
const hashes: TxHash[] = [];
626626
const indicesOfHashesPeerHas = new Set(response.txIndices.getTrueIndices());
627-
this.blockProposal.txHashes.forEach((hash, idx) => {
627+
this.blockTxsSource.txHashes.forEach((hash, idx) => {
628628
if (indicesOfHashesPeerHas.has(idx)) {
629629
hashes.push(hash);
630630
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import { BlockNumber } from '@aztec/foundation/branded-types';
2+
import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer';
3+
import { Fr } from '@aztec/foundation/curves/bn254';
4+
import type { BlockProposal } from '@aztec/stdlib/p2p';
5+
import { makeBlockHeader, makeBlockProposal } from '@aztec/stdlib/testing';
6+
import { Tx, TxHash, TxHashArray } from '@aztec/stdlib/tx';
7+
8+
import type { PeerId } from '@libp2p/interface';
9+
import { type MockProxy, mock } from 'jest-mock-extended';
10+
11+
import type { AttestationPool, TxPool } from '../../../../mem_pools/index.js';
12+
import { ReqRespStatus } from '../../status.js';
13+
import { BitVector } from './bitvector.js';
14+
import { reqRespBlockTxsHandler } from './block_txs_handler.js';
15+
import { BlockTxsRequest, BlockTxsResponse } from './block_txs_reqresp.js';
16+
17+
describe('reqRespBlockTxsHandler', () => {
18+
let attestationPool: MockProxy<AttestationPool>;
19+
let txPool: MockProxy<TxPool>;
20+
let peerId: PeerId;
21+
22+
const makeTx = (txHash?: TxHash) => Tx.random({ txHash }) as Tx;
23+
24+
const createBlockProposal = (txHashes: TxHash[]): Promise<BlockProposal> => {
25+
return makeBlockProposal({
26+
signer: Secp256k1Signer.random(),
27+
blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(5) }),
28+
archiveRoot: Fr.random(),
29+
txHashes,
30+
});
31+
};
32+
33+
const callHandler = async (request: BlockTxsRequest) => {
34+
const handler = reqRespBlockTxsHandler(attestationPool, txPool);
35+
const responseBuffer = await handler(peerId, request.toBuffer());
36+
return BlockTxsResponse.fromBuffer(responseBuffer);
37+
};
38+
39+
beforeEach(() => {
40+
attestationPool = mock<AttestationPool>();
41+
txPool = mock<TxPool>();
42+
peerId = mock<PeerId>();
43+
44+
attestationPool.getBlockProposal.mockResolvedValue(undefined);
45+
txPool.getTxsByHash.mockResolvedValue([]);
46+
txPool.hasTxs.mockResolvedValue([]);
47+
txPool.getArchivedTxByHash.mockResolvedValue(undefined);
48+
});
49+
50+
describe('no block proposal, no tx hashes', () => {
51+
it('throws NOT_FOUND', async () => {
52+
const request = new BlockTxsRequest(Fr.random(), new TxHashArray(), BitVector.init(0, []));
53+
const handler = reqRespBlockTxsHandler(attestationPool, txPool);
54+
await expect(handler(peerId, request.toBuffer())).rejects.toMatchObject({
55+
status: ReqRespStatus.NOT_FOUND,
56+
});
57+
});
58+
});
59+
60+
describe('no block proposal, explicit tx hashes', () => {
61+
it('returns txs found in active pool', async () => {
62+
const txHashes = [TxHash.random(), TxHash.random()];
63+
const txs = txHashes.map(h => makeTx(h));
64+
txPool.getTxsByHash.mockResolvedValue(txs);
65+
66+
const request = new BlockTxsRequest(Fr.random(), new TxHashArray(...txHashes), BitVector.init(0, []));
67+
const response = await callHandler(request);
68+
69+
expect(response.txs.length).toBe(2);
70+
expect(response.archiveRoot).toEqual(Fr.zero());
71+
});
72+
73+
it('falls back to archive for txs not in active pool', async () => {
74+
const txHashes = [TxHash.random(), TxHash.random(), TxHash.random()];
75+
const poolTx = makeTx(txHashes[0]);
76+
const archivedTx = makeTx(txHashes[2]);
77+
78+
txPool.getTxsByHash.mockResolvedValue([poolTx, undefined, undefined]);
79+
txPool.getArchivedTxByHash.mockImplementation((hash: TxHash) => {
80+
if (hash.equals(txHashes[2])) {
81+
return Promise.resolve(archivedTx);
82+
}
83+
return Promise.resolve(undefined);
84+
});
85+
86+
const request = new BlockTxsRequest(Fr.random(), new TxHashArray(...txHashes), BitVector.init(0, []));
87+
const response = await callHandler(request);
88+
89+
expect(response.txs.length).toBe(2);
90+
expect(txPool.getArchivedTxByHash).toHaveBeenCalledTimes(2);
91+
});
92+
93+
it('returns empty when txs not in pool or archive', async () => {
94+
const txHashes = [TxHash.random()];
95+
txPool.getTxsByHash.mockResolvedValue([undefined]);
96+
97+
const request = new BlockTxsRequest(Fr.random(), new TxHashArray(...txHashes), BitVector.init(0, []));
98+
const response = await callHandler(request);
99+
100+
expect(response.txs.length).toBe(0);
101+
expect(txPool.getArchivedTxByHash).toHaveBeenCalledTimes(1);
102+
});
103+
});
104+
105+
describe('with block proposal', () => {
106+
it('returns availability bitvector and requested txs', async () => {
107+
const txHashes = [TxHash.random(), TxHash.random(), TxHash.random()];
108+
const proposal = await createBlockProposal(txHashes);
109+
const txs = txHashes.map(h => makeTx(h));
110+
111+
attestationPool.getBlockProposal.mockResolvedValue(proposal);
112+
txPool.hasTxs.mockResolvedValue([true, true, true]);
113+
txPool.getTxsByHash.mockResolvedValue(txs);
114+
115+
const request = new BlockTxsRequest(proposal.archive, new TxHashArray(), BitVector.init(3, [0, 1, 2]));
116+
const response = await callHandler(request);
117+
118+
expect(response.archiveRoot).toEqual(proposal.archive);
119+
expect(response.txs.length).toBe(3);
120+
expect(response.txIndices.getTrueIndices()).toEqual([0, 1, 2]);
121+
});
122+
123+
it('returns partial availability when some txs missing from pool', async () => {
124+
const txHashes = [TxHash.random(), TxHash.random(), TxHash.random()];
125+
const proposal = await createBlockProposal(txHashes);
126+
127+
attestationPool.getBlockProposal.mockResolvedValue(proposal);
128+
txPool.hasTxs.mockResolvedValue([true, false, true]);
129+
txPool.getTxsByHash.mockResolvedValue([makeTx(txHashes[0]), undefined, makeTx(txHashes[2])]);
130+
131+
const request = new BlockTxsRequest(proposal.archive, new TxHashArray(), BitVector.init(3, [0, 2]));
132+
const response = await callHandler(request);
133+
134+
expect(response.txs.length).toBe(2);
135+
expect(response.txIndices.getTrueIndices()).toEqual([0, 2]);
136+
});
137+
138+
it('filters out undefined txs from pool response', async () => {
139+
const txHashes = [TxHash.random(), TxHash.random()];
140+
const proposal = await createBlockProposal(txHashes);
141+
142+
attestationPool.getBlockProposal.mockResolvedValue(proposal);
143+
txPool.hasTxs.mockResolvedValue([true, false]);
144+
txPool.getTxsByHash.mockResolvedValue([makeTx(txHashes[0]), undefined]);
145+
146+
const request = new BlockTxsRequest(proposal.archive, new TxHashArray(), BitVector.init(2, [0, 1]));
147+
const response = await callHandler(request);
148+
149+
expect(response.txs.length).toBe(1);
150+
});
151+
});
152+
});

yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs_handler.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Fr } from '@aztec/foundation/curves/bn254';
2-
import { TxArray } from '@aztec/stdlib/tx';
2+
import { Tx, TxArray, type TxHash } from '@aztec/stdlib/tx';
33

44
import type { PeerId } from '@libp2p/interface';
55

@@ -13,10 +13,18 @@ import { BlockTxsRequest, BlockTxsResponse } from './block_txs_reqresp.js';
1313
/**
1414
* Handler for block txs requests
1515
* @param attestationPool - the attestation pool to check for block proposals
16-
* @param mempools - the mempools containing the tx pool
16+
* @param txPool - the tx pool to fetch transactions from (including archive)
1717
* @returns the BlockTxs request handler
1818
*/
1919
export function reqRespBlockTxsHandler(attestationPool: AttestationPool, txPool: TxPool): ReqRespSubProtocolHandler {
20+
/** For each undefined entry in poolResults, attempts to fetch from the archive. */
21+
async function fillFromArchive(txHashes: TxHash[], poolResults: (Tx | undefined)[]) {
22+
const results = await Promise.all(
23+
poolResults.map((tx, i) => (tx !== undefined ? tx : txPool.getArchivedTxByHash(txHashes[i]))),
24+
);
25+
return results;
26+
}
27+
2028
/**
2129
* Handler for block txs requests
2230
* @param msg - the block txs request message
@@ -41,7 +49,8 @@ export function reqRespBlockTxsHandler(attestationPool: AttestationPool, txPool:
4149
// This is scenario in which we don't have this block proposal the peer is requesting from us
4250
// But peer has sent requested tx hashes, so we can send them the transactions
4351
if (!blockProposal && requestedTxsHashes !== undefined) {
44-
const responseTxs = (await txPool.getTxsByHash(requestedTxsHashes)).filter(tx => !!tx);
52+
const poolResults = await txPool.getTxsByHash(requestedTxsHashes);
53+
const responseTxs = (await fillFromArchive(requestedTxsHashes, poolResults)).filter(tx => !!tx);
4554
const response = new BlockTxsResponse(Fr.zero(), new TxArray(...responseTxs), BitVector.init(0, []));
4655
return response.toBuffer();
4756
}

yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs_reqresp.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
import { Fr } from '@aztec/foundation/curves/bn254';
22
import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize';
3-
import type { BlockProposal } from '@aztec/stdlib/p2p';
4-
import { TxArray, TxHash, TxHashArray } from '@aztec/stdlib/tx';
3+
import { TxArray, type TxHash, TxHashArray } from '@aztec/stdlib/tx';
54

65
import { BitVector } from './bitvector.js';
76

7+
/** Minimal interface for a block source that provides tx hashes and an archive root. */
8+
export interface BlockTxsSource {
9+
txHashes: TxHash[];
10+
archive: Fr;
11+
}
12+
813
/**
914
* Request message for requesting specific transactions from a block
1015
*/
@@ -31,7 +36,7 @@ export class BlockTxsRequest {
3136
* @returns undefined if there were no missingTxHashes matching BlockProposal hashes, otherwise
3237
* returns new BlockTxsRequest*/
3338
static fromBlockProposalAndMissingTxs(
34-
blockProposal: BlockProposal,
39+
blockTxsSource: BlockTxsSource,
3540
missingTxHashes: TxHash[],
3641
includeFullTxHashes = false,
3742
): BlockTxsRequest | undefined {
@@ -41,19 +46,19 @@ export class BlockTxsRequest {
4146

4247
const missingHashesSet = new Set(missingTxHashes.map(t => t.toString()));
4348

44-
// We cannot request txs that are not part of the block proposal
45-
if (!missingHashesSet.isSubsetOf(new Set(blockProposal.txHashes.map(t => t.toString())))) {
49+
// We cannot request txs that are not part of the block
50+
if (!missingHashesSet.isSubsetOf(new Set(blockTxsSource.txHashes.map(t => t.toString())))) {
4651
return undefined;
4752
}
4853

49-
const missingIndices = blockProposal.txHashes
54+
const missingIndices = blockTxsSource.txHashes
5055
.map((hash, idx) => (missingHashesSet.has(hash.toString()) ? idx : -1))
5156
.filter(i => i != -1);
5257

53-
const requestBitVector = BitVector.init(blockProposal.txHashes.length, missingIndices);
58+
const requestBitVector = BitVector.init(blockTxsSource.txHashes.length, missingIndices);
5459
const hashes = includeFullTxHashes ? new TxHashArray(...missingTxHashes) : new TxHashArray();
5560

56-
return new BlockTxsRequest(blockProposal.archive, hashes, requestBitVector);
61+
return new BlockTxsRequest(blockTxsSource.archive, hashes, requestBitVector);
5762
}
5863

5964
/**

yarn-project/p2p/src/services/tx_collection/config.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
} from '@aztec/foundation/config';
77
import { MAX_RPC_TXS_LEN } from '@aztec/stdlib/interfaces/api-limit';
88

9-
export type ProposalTxCollectorType = 'new' | 'old';
9+
export type MissingTxsCollectorType = 'new' | 'old';
1010

1111
export type TxCollectionConfig = {
1212
/** How long to wait before starting reqresp for fast collection */
@@ -29,8 +29,8 @@ export type TxCollectionConfig = {
2929
txCollectionFastMaxParallelRequestsPerNode: number;
3030
/** Maximum number of transactions to request from a node in a single batch */
3131
txCollectionNodeRpcMaxBatchSize: number;
32-
/** Which collector implementation to use for proposal tx collection */
33-
txCollectionProposalTxCollectorType: ProposalTxCollectorType;
32+
/** Which collector implementation to use for missing txs collection */
33+
txCollectionMissingTxsCollectorType: MissingTxsCollectorType;
3434
};
3535

3636
export const txCollectionConfigMappings: ConfigMappingsType<TxCollectionConfig> = {
@@ -90,9 +90,9 @@ export const txCollectionConfigMappings: ConfigMappingsType<TxCollectionConfig>
9090
description: 'Maximum number of transactions to request from a node in a single batch',
9191
...numberConfigHelper(MAX_RPC_TXS_LEN),
9292
},
93-
txCollectionProposalTxCollectorType: {
93+
txCollectionMissingTxsCollectorType: {
9494
env: 'TX_COLLECTION_PROPOSAL_TX_COLLECTOR_TYPE',
95-
description: 'Which collector implementation to use for proposal tx collection (new or old)',
95+
description: 'Which collector implementation to use for missing txs collection (new or old)',
9696
...enumConfigHelper(['new', 'old'] as const, 'new'),
9797
},
9898
};

0 commit comments

Comments
 (0)