Skip to content

Update dependencies & migrate to new jetstream module#113

Open
Jarema wants to merge 6 commits into
mainfrom
update-deps
Open

Update dependencies & migrate to new jetstream module#113
Jarema wants to merge 6 commits into
mainfrom
update-deps

Conversation

@Jarema
Copy link
Copy Markdown
Member

@Jarema Jarema commented Mar 25, 2026

Signed-off-by: Tomasz Pietrek tomasz@synadia.com

@Jarema Jarema requested review from bruth and philpennock March 25, 2026 12:34
@Jarema Jarema force-pushed the update-deps branch 2 times, most recently from 9662f8f to 7162efd Compare March 26, 2026 10:05
@Jarema Jarema changed the title Update dependencies Update dependencies & migrate to new jetstream module Mar 26, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 26, 2026

Codecov Report

❌ Patch coverage is 63.33333% with 22 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.06%. Comparing base (310c39f) to head (5fe9982).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
server/core/connector.go 56.86% 16 Missing and 6 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #113      +/-   ##
==========================================
- Coverage   56.43%   56.06%   -0.38%     
==========================================
  Files          33       33              
  Lines        2151     2185      +34     
==========================================
+ Hits         1214     1225      +11     
- Misses        808      825      +17     
- Partials      129      135       +6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Jarema added 3 commits March 26, 2026 17:36
Root cause: The two failing tests pre-created a JetStream consumer with OptStartSeq: 2 /
DeliverByStartSequencePolicy, then published "one" at sequence 1 — expecting it to be delivered. In
nats-server v2.10, seq 1 was incorrectly delivered despite OptStartSeq: 2. In v2.12, the server correctly
skips it. The WaitForRequests(1) silently timed out after 5s, then only 2 messages (instead of 3) made
it to Kafka.

Fix: Removed the OptStartSeq: 2 / DeliverByStartSequencePolicy / StartAtSequence: 2 config from both
tests since they were testing durable subscriber persistence across restarts, not sequence-start
positioning (that's already tested by TestJetStreamQueueStartAtPosition). Switched to DeliverAllPolicy so
"one" at seq 1 is properly delivered.

Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
@Jarema
Copy link
Copy Markdown
Member Author

Jarema commented Mar 26, 2026

@claude review this PR.

@synadia-claude-reviewer
Copy link
Copy Markdown

synadia-claude-reviewer Bot commented Mar 26, 2026

Claude finished @Jarema's task in 2m 3s —— View job


PR Review: Update dependencies & migrate to new jetstream module

  • Gather context and understand the PR
  • Read changed files
  • Analyze changes for correctness, quality, and issues
  • Post review feedback

This PR migrates from the legacy nats.JetStreamContext API to the new github.com/nats-io/nats.go/jetstream package, switching from push-based consumers to pull-based consumers. Overall the migration is solid, but there are a few issues worth addressing.

Bug: defer ackCancel() inside a message callback (connector.go:440)

cc, err := consumer.Consume(func(msg jetstream.Msg) {
    ...
    if ackSyncEnabled {
        ackCtx, ackCancel := context.WithTimeout(...)
        defer ackCancel()   // ← deferred to when this anonymous func returns
        if err := msg.DoubleAck(ackCtx); err != nil { ... }
    }
    ...
})

At first glance this looks fine (the defer runs when the callback returns), but defer in a loop or callback is a well-known Go gotcha. Here the Consume library likely invokes this callback sequentially per message, so the context is cancelled correctly on each invocation. However, the semantics are fragile: if the library ever invokes the callback asynchronously or reuses goroutines, the context could leak or be cancelled early. A direct ackCancel() call immediately after DoubleAck (replacing defer) would be safer and more explicit. Fix this →

Breaking change: QueueName silently dropped for JetStream connectors

Previously, conn.config.QueueName was passed to QueueSubscribe for push consumers. The new signature drops it entirely:

// Old
func (conn *BridgeConnector) subscribeToJetStream(subject string, queueName string) (*nats.Subscription, error)

// New  
func (conn *BridgeConnector) subscribeToJetStream(subject string) (jetstream.ConsumeContext, error)

Any existing deployments using QueueName in a JetStreamToKafka connector config will have that setting silently ignored. This should at minimum be documented (a warning log if QueueName is set), or alternatively the new pull consumer approach should implement shared consumers to preserve the semantic.

Breaking change: EnableFlowControl and HeartbeatInterval silently ignored

conf.JetStreamConfig still exposes EnableFlowControl bool and HeartbeatInterval int, but connectToJetStream() no longer applies them anywhere. These were push-consumer features and don't map 1:1 to pull consumers, but existing users with these settings in their config will get no indication they're being ignored. A deprecation warning in the logs would be appropriate.

Missing msg.Nak() on Kafka write failure (connector.go:431-433)

if err != nil {
    conn.stats.AddMessageIn(l)
    conn.bridge.Logger().Errorf("connector publish failure, ...")
    // message is not acked or nacked
}

When writing to Kafka fails, the message is left unacknowledged. It will eventually be redelivered after the ack timeout, which is the correct behavior, but it's inefficient. An explicit msg.Nak() or msg.NakWithDelay(...) would trigger immediate redelivery with backoff. This was a pre-existing issue but the migration to pull consumers makes it more relevant since delivery behavior is now fully controlled client-side.

Stats tracking asymmetry (connector.go:432)

conn.stats.AddMessageIn(l) is only called on the error path. On the success path, conn.stats.AddRequest(l, l, time.Since(start)) is called instead. This was present before and is likely intentional, but worth confirming the stats semantics are preserved correctly across the migration.

Test change: consumer pre-created with wrong policy (jetstream2kafka_test.go:463-469)

In TestJetStreamQueueDurableSubscriber, the test creates a consumer upfront:

_, err = tbs.JS.CreateOrUpdateConsumer(context.Background(), stream, jetstream.ConsumerConfig{
    Durable:       durable,
    AckPolicy:     jetstream.AckExplicitPolicy,
    DeliverPolicy: jetstream.DeliverAllPolicy,  // ← was DeliverByStartSequencePolicy
})

The old test used DeliverByStartSequencePolicy with OptStartSeq: 2 to verify the durable consumer resumed at the right position. The removal of StartAtSequence: 2 from the connector config and DeliverByStartSequencePolicy from the pre-created consumer simplifies the test but weakens it — it no longer tests that durable consumers resume from the correct offset. The same simplification is applied to the SASL variant.

Positive changes

  • The ConsumeErrHandler addition is a good improvement — push consumers silently dropped errors.
  • Auto-discovery of stream name via StreamNameBySubject is more ergonomic than requiring explicit stream binding.
  • Consistent use of context.WithTimeout throughout replaces the old per-call MaxWait option.
  • Test fix in kafka2jetstream_test.go: switching from tbs.JS.Subscribe to tbs.NC.Subscribe is correct — a plain NATS subscription is simpler and sufficient since JetStream publishes also flow to core NATS subscribers.

Copy link
Copy Markdown
Member

@philpennock philpennock left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any dependency updates in here, which is what the PR title says should be here?

The code changes look fine.

Jarema added 3 commits March 31, 2026 17:39
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
Signed-off-by: Tomasz Pietrek <tomasz@synadia.com>
@Jarema Jarema requested a review from philpennock April 1, 2026 10:41
@Jarema
Copy link
Copy Markdown
Member Author

Jarema commented Apr 1, 2026

@philpennock I found few issues and addressed them.
Changes were big enough to require a new review.

Copy link
Copy Markdown

@piotrpio piotrpio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Copy Markdown
Member

@philpennock philpennock left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine a minor grumbling noise about using %s instead of %q for arbitrary subjects logging, combined with "I guess it's consistent with the rest of the code-base, bleh". :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants