Skip to content

Commit 9dc36f6

Browse files
authored
test: Add E2E integration and load tests for EventStream pipeline (#1115)
* test: Add E2E integration and load tests for EventStream pipeline Full-stack integration tests covering the Kafka -> EventSource -> FanOut -> Router -> Handler -> WebSocket client pipeline: - Kafka testcontainer + miniredis full-stack E2E test - InProcessFanOut-based E2E delivery test - Cross-tenant isolation verification (zero leakage) - Multi-tenant concurrent event delivery - Channel-scoped subscription filtering - 100 concurrent WebSocket connections load test (50k events/sec) - 5000 events/sec sustained throughput test - Go benchmark for connection concurrency * fix: Use cancel-only context in benchmark to prevent timeout with large b.N * refactor: Address CodeRabbit review feedback - Export ConsumerGroupID constant to prevent test drift between integration_test.go and kafka_source.go - Replace tautological const assertion with runtime registry check --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent da6f35e commit 9dc36f6

3 files changed

Lines changed: 1326 additions & 5 deletions

File tree

services/gateway/eventstream/adapters/kafka_source.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ const (
2525
headerCorrelationID = "correlation_id"
2626
headerCausationID = "causation_id"
2727

28-
consumerGroupID = "ops-console-events"
28+
// ConsumerGroupID is the Kafka consumer group used by KafkaEventSource.
29+
ConsumerGroupID = "ops-console-events"
2930
)
3031

3132
// Sentinel errors returned by KafkaEventSource constructors.
@@ -90,7 +91,7 @@ func NewKafkaEventSource(
9091

9192
opts := []kgo.Opt{
9293
kgo.SeedBrokers(brokers...),
93-
kgo.ConsumerGroup(consumerGroupID),
94+
kgo.ConsumerGroup(ConsumerGroupID),
9495
kgo.ConsumeTopics(topics...),
9596
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
9697
kgo.BlockRebalanceOnPoll(),

services/gateway/eventstream/adapters/kafka_source_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func TestKafkaEventSource_Integration(t *testing.T) {
349349
AtMost(10 * time.Second).
350350
PollInterval(200 * time.Millisecond).
351351
UntilNoError(func() error {
352-
return consumerGroupActive(ctx, brokerAddr, consumerGroupID)
352+
return consumerGroupActive(ctx, brokerAddr, ConsumerGroupID)
353353
}); err != nil {
354354
t.Fatalf("consumer group did not become active: %v", err)
355355
}
@@ -409,7 +409,7 @@ func TestKafkaEventSource_Integration(t *testing.T) {
409409
AtMost(5 * time.Second).
410410
PollInterval(100 * time.Millisecond).
411411
UntilNoError(func() error {
412-
return consumerGroupActive(ctx, brokerAddr, consumerGroupID)
412+
return consumerGroupActive(ctx, brokerAddr, ConsumerGroupID)
413413
}); err != nil {
414414
t.Logf("consumer group may not be fully active: %v (continuing)", err)
415415
}
@@ -464,7 +464,7 @@ func TestKafkaEventSource_Integration(t *testing.T) {
464464
AtMost(15 * time.Second).
465465
PollInterval(200 * time.Millisecond).
466466
UntilNoError(func() error {
467-
return consumerGroupActive(ctx3, brokerAddr, consumerGroupID)
467+
return consumerGroupActive(ctx3, brokerAddr, ConsumerGroupID)
468468
}); err != nil {
469469
t.Fatalf("consumer group did not become active: %v", err)
470470
}

0 commit comments

Comments
 (0)