Skip to content

feat(microservices): add topic-consumers option to server-kafka#16555

Open
suuuuuuminnnnnn wants to merge 5 commits intonestjs:masterfrom
suuuuuuminnnnnn:feat/kafka-topic-consumers
Open

feat(microservices): add topic-consumers option to server-kafka#16555
suuuuuuminnnnnn wants to merge 5 commits intonestjs:masterfrom
suuuuuuminnnnnn:feat/kafka-topic-consumers

Conversation

@suuuuuuminnnnnn
Copy link
Copy Markdown
Contributor

PR Checklist

Please check if your PR fulfills the following requirements:

PR Type

What kind of change does this PR introduce?

  • Bugfix
  • Feature
  • Code style update (formatting, local variables)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • CI related changes
  • Other... Please describe:

What is the current behavior?

ServerKafka uses a single KafkaJS consumer subscribed to all registered topics, so a slow handler on one topic can effectively block message processing on other topics (topic-level backpressure). Partition-level concurrency (partitionsConsumedConcurrently) does not address cross-topic blocking.

Issue Number: #14590

What is the new behavior?

Adds an opt-in topicConsumers: boolean option for Kafka microservices.

When topicConsumers: true:

  • Creates a separate consumer per registered topic and subscribes each consumer to exactly one topic
  • Uses {groupId}-{topic} as the consumer group id (independent offset tracking per topic)
  • Disconnects all per-topic consumers in parallel on close()
  • Logs a deduped warning when the composed group id appears invalid (whitespace or >255 chars)
  • Avoids mutating options.run when building per-topic run() options

Default remains false, preserving the existing single-consumer behavior.

Does this PR introduce a breaking change?

  • Yes
  • No

Note: enabling topicConsumers: true creates new consumer groups ({groupId}-{topic}), so migrating an existing deployment may require offset management.

Other information

Tests

npm test -- packages/microservices/test/server/server-kafka.spec.ts

All tests pass, including new unit tests for topicConsumers mode.

@coveralls
Copy link
Copy Markdown

coveralls commented Mar 12, 2026

Coverage Report for CI Build 0

Coverage decreased (-0.06%) to 89.815%

Details

  • Coverage decreased (-0.06%) from the base build.
  • Patch coverage: 4 uncovered changes across 1 file (46 of 50 lines covered, 92.0%).
  • 109 coverage regressions across 14 files.

Uncovered Changes

File Changed Covered %
packages/microservices/server/server-kafka.ts 50 46 92.0%

Coverage Regressions

109 previously-covered lines in 14 files lost coverage.

Top 10 Files by Coverage Loss Lines Losing Coverage Coverage
packages/microservices/server/server-redis.ts 21 76.15%
packages/core/injector/injector.ts 19 80.87%
packages/microservices/client/client-redis.ts 16 79.84%
packages/microservices/server/server-mqtt.ts 16 83.04%
packages/core/helpers/external-context-creator.ts 10 86.11%
packages/core/scanner.ts 10 89.76%
packages/core/injector/internal-core-module/internal-core-module-factory.ts 4 81.82%
packages/microservices/server/server-grpc.ts 4 85.39%
packages/core/injector/module.ts 3 85.23%
packages/microservices/server/server-kafka.ts 2 86.21%

Coverage Stats

Coverage Status
Relevant Lines: 8336
Covered Lines: 7487
Line Coverage: 89.82%
Relevant Branches: 3577
Covered Branches: 2871
Branch Coverage: 80.26%
Branches in Coverage %: No
Coverage Strength: 19.0 hits per line

💛 - Coveralls

@kamilmysliwiec
Copy link
Copy Markdown
Member

Any chance to add an integration test for this, too?

@suuuuuuminnnnnn
Copy link
Copy Markdown
Contributor Author

sure — i can add an integration test as well. i’ll put together an e2e test that exercises the option end-to-end and ensures it’s propagated correctly, then update the pr.

@suuuuuuminnnnnn suuuuuuminnnnnn force-pushed the feat/kafka-topic-consumers branch from 998f293 to bade813 Compare April 15, 2026 22:55
@Qodo-Free-For-OSS
Copy link
Copy Markdown

Hi, The new kafka-topic-consumers.spec.ts is permanently skipped (describe.skip), so it won’t validate the new topicConsumers feature in CI and may silently rot.

Severity: informational | Category: reliability

How to fix: Gate test via env flag

Agent prompt to fix - you can give this to your LLM of choice:

Issue description

The new Kafka integration test is always skipped.

Fix Focus Areas

  • integration/microservices/e2e/kafka-topic-consumers.spec.ts[11-16]
    • Replace describe.skip with conditional skipping based on an env flag (e.g., process.env.KAFKA_E2E === '1').
  • .circleci/config.yml[133-167]
    • Optionally enable the flag and start Kafka for the integration_tests job if you want CI coverage.

Found by Qodo code review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants