Skip to content

Commit 9f55de6

Browse files
Merge pull request #16326 from suuuuuuminnnnnn/test/kafka-run-options-forwarding
test(microservices): verify kafka run options forwarding
2 parents f2c2a3e + e0c40e4 commit 9f55de6

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

packages/microservices/interfaces/microservice-configuration.interface.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,11 @@ export interface KafkaOptions {
339339
postfixId?: string;
340340
client?: KafkaConfig;
341341
consumer?: ConsumerConfig;
342+
/**
343+
* Options passed to KafkaJS consumer.run().
344+
* Note: `partitionsConsumedConcurrently` (KafkaJS parameter) controls
345+
* concurrent processing at the partition level (not topic level).
346+
*/
342347
run?: Omit<ConsumerRunConfig, 'eachBatch' | 'eachMessage'>;
343348
subscribe?: Omit<ConsumerSubscribeTopics, 'topics'>;
344349
producer?: ProducerConfig;

packages/microservices/test/server/server-kafka.spec.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,39 @@ describe('ServerKafka', () => {
256256
expect(run.called).to.be.true;
257257
expect(connect.called).to.be.true;
258258
});
259+
it('should pass run options with partitionsConsumedConcurrently to consumer.run()', async () => {
260+
untypedServer.logger = new NoopLogger();
261+
untypedServer.options.run = {
262+
partitionsConsumedConcurrently: 5,
263+
};
264+
await server.listen(callback);
265+
await server.bindEvents(untypedServer.consumer);
266+
267+
expect(run.called).to.be.true;
268+
expect(run.firstCall.args[0]).to.include({
269+
partitionsConsumedConcurrently: 5,
270+
});
271+
expect(run.firstCall.args[0]).to.have.property('eachMessage');
272+
});
273+
it('should pass multiple run options to consumer.run()', async () => {
274+
untypedServer.logger = new NoopLogger();
275+
untypedServer.options.run = {
276+
partitionsConsumedConcurrently: 3,
277+
autoCommit: false,
278+
autoCommitInterval: 5000,
279+
};
280+
await server.listen(callback);
281+
await server.bindEvents(untypedServer.consumer);
282+
283+
expect(run.called).to.be.true;
284+
const callArg = run.firstCall.args[0];
285+
expect(callArg).to.include({
286+
partitionsConsumedConcurrently: 3,
287+
autoCommit: false,
288+
autoCommitInterval: 5000,
289+
});
290+
expect(callArg).to.have.property('eachMessage');
291+
});
259292
});
260293

261294
describe('getMessageHandler', () => {

0 commit comments

Comments
 (0)