Skip to content

Releases: ricardo-ch/go-kafka

v4.0.2-beta

21 May 13:11

Choose a tag to compare

v4.0.2-beta Pre-release
Pre-release

What's Changed

  • tweak logging

Full Changelog: v4.0.1...v4.0.2-beta

v3.6.0

21 Apr 09:41
22f1b50

Choose a tag to compare

What's Changed

  • feat: add opentelemetry tracing on v3 by @vcrenca in #61

Full Changelog: v3.5.0...v3.6.0

v3.6.0-alpha

20 Apr 12:39
22f1b50

Choose a tag to compare

v3.6.0-alpha Pre-release
Pre-release

What's Changed

  • feat: add opentelemetry tracing on v3 by @vcrenca in #61

Full Changelog: v3.5.0...v3.6.0-alpha

v4.0.1

07 Apr 15:14
a977399

Choose a tag to compare

What's Changed

  • Bump google.golang.org/grpc from 1.77.0 to 1.79.3 in /examples/consumer-tracing by @dependabot[bot] in #60
  • Bump go.opentelemetry.io/otel/trace from 1.42.0 to 1.43.0 in the other-dependencies group across 1 directory by @dependabot[bot] in #59

Full Changelog: v4.0.0...v4.0.1

v4.0.0

07 Apr 13:43
0561e79

Choose a tag to compare

🚨 Breaking Changes (go-kafka v4.0.0)

This major release introduces significant changes aligned with modern Go practices, observability standards, and reliability improvements.

Runtime & Dependencies

  • Minimum Go version is now 1.26+ (required for log/slog)
  • Module path will move to github.com/ricardo-ch/go-kafka/v4

Producer API

  • Produce(msg)Produce(ctx context.Context, msg)
    Context is now required for tracing and middleware support

Logging

  • Removed custom logging system (Logger, ErrorLogger, SetLogger, etc.)
  • The library now uses slog.Default() directly
    → configure logging via slog.SetDefault() in your application

Tracing

  • Replaced OpenTracing with OpenTelemetry
  • Trace propagation now uses W3C Trace Context (traceparent, tracestate)
  • Requires updating tracing setup and Kafka header propagation

Error Handling

  • ErrEventUnretriable and ErrEventOmitted are now interface-based
  • Requires migration to:
    • kafka.NewUnretriableError(err)
    • kafka.NewOmittedError(err)
    • or custom types implementing interfaces

Retry / Deadletter Behavior

  • Forwarding is now guaranteed with retry (no more fire-and-forget)
  • New config: ForwardMaxBackoffDuration (default: 30s)

Backoff Behavior

  • ExponentialBackoffFunc is now nil by default and lazily initialized
  • MaxBackoffDuration default increased from 1 min → 10 min

Metrics & Logging Format

  • Default log field format changed to camelCase
    → use kafka.LogFieldFormatSnakeCase for backward compatibility

Removed

  • Ptr[T any](v T) helper (now standard in Go 1.26)

⚠️ Migration Required

This release requires updates in:

  • Producer calls (add context)
  • Logging configuration (slog)
  • Tracing setup (OpenTelemetry)
  • Error handling patterns

Refer to the migration guide for detailed examples.

👉 For the full list of changes, see: #56

v4.0.0-beta.1

03 Apr 14:48

Choose a tag to compare

v4.0.0-beta.1 Pre-release
Pre-release

Full Changelog: v4.0.0-alpha.2...v4.0.0-beta.1

Bug fix

v4.0.0-alpha.2

01 Apr 15:39

Choose a tag to compare

Full Changelog: v4.0.0-alpha.1...v4.0.0-alpha.2

Add new metrics kafka_consumer_record_dropped_total Number of messages dropped because no retry or deadletter topic was configured

v4.0.0-alpha.1

01 Apr 14:06

Choose a tag to compare

What's Changed

New Contributors

Full Changelog: v4.0.0-alpha...v4.0.0-alpha.1

v3.5.0

27 Mar 15:22
f97ef89

Choose a tag to compare

What's Changed

New Contributors

Full Changelog: v3.4.0...v3.5.0

Go kafka v4 alpha

25 Mar 08:04

Choose a tag to compare

Full Changelog: v3.4.0...v4.0.0-alpha

go-kafka v4.0.0

Summary

Major release introducing modern Go patterns, improved observability, guaranteed delivery for retry/deadletter forwarding, and better error handling flexibility.

Breaking Changes

  • Minimum Go version: Go 1.26+ required (for log/slog support)
  • Module path: github.com/ricardo-ch/go-kafka/v3 (to be updated to /v4 before release)
  • Producer API: Produce(msg)Produce(ctx context.Context, msg) — context is now required for trace propagation and middleware compatibility
  • Error handling: ErrEventUnretriable and ErrEventOmitted are now interface-based (see migration guide below)
  • Logging: Removed custom Logger/ErrorLogger and all wrapper functions (SetLogger, SetLogLevel, LowercaseLevelAttr). The library now calls slog.Default() directly — configure via slog.SetDefault() in your application
  • Tracing: Replaced OpenTracing (github.com/opentracing/opentracing-go, github.com/ricardo-ch/go-tracing) with OpenTelemetry (go.opentelemetry.io/otel). Propagation format is now W3C Trace Context (traceparent, tracestate)
  • Exponential backoff: ExponentialBackoffFunc is now nil by default and evaluated lazily using current DurationBeforeRetry/MaxBackoffDuration via sarama.NewExponentialBackoff
  • MaxBackoffDuration: Default changed from 1 minute to 10 minutes
  • Retry/deadletter forwarding: Now retries on producer failure with exponential backoff instead of fire-and-forget. Controlled by the new ForwardMaxBackoffDuration variable (default: 30s)
  • Removed: func Ptr[T any](v T) (now standard in Go 1.26)

New Features

Structured Logging with slog

The library uses slog.Default() directly with no wrapper functions. Configure logging in your application:

slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelInfo,
})))

Context-aware Logging (WithLogContextStorer)

When processing a message, the library creates a *slog.Logger enriched with Kafka metadata (topic, partition, offset, key, consumer_group) as a structured "kafka" group. This enriched logger is used internally for all message-processing logs, ensuring consistent metadata in every log line.

You can opt in to receive this logger in your handler's context via WithLogContextStorer:

type LogContextStorer func(ctx context.Context, logger *slog.Logger) context.Context

The library:

  1. Builds a *slog.Logger via slog.With("kafka", kafkaMessageInfo{...}) containing the message metadata
  2. Stores it internally for its own logs (loggerFromContext)
  3. Calls your LogContextStorer function so your handler can retrieve it from context

Example setup:

listener, err := kafka.NewListener(appName, handlers,
    kafka.WithLogContextStorer(myToContext),
)

Example handler:

func myHandler(ctx context.Context, msg *sarama.ConsumerMessage) error {
    myFromContext(ctx).Info("processing order", "order_id", orderID)
    return nil
}

Output (JSON):

{"level":"INFO","msg":"processing order","kafka":{"topic":"orders","consumer_group":"my-group","partition":0,"offset":42},"order_id":"abc-123"}

kafkaMessageInfo implements slog.LogValuer for structured nested output under the "kafka" key. The LogContextStorer function type is agnostic — provide your own ToContext/FromContext helpers or use a library like slogr.

Error Type Classification and Stack Traces

Each error is now logged with an error_type field ("retriable", "unretriable", or "omitted"). Retriable errors additionally include a full stack trace via runtime/debug.Stack() for debugging.

Interface-based Error Handling

Errors are now detected via interfaces, allowing custom error types:

// Option 1: Use wrapper functions (recommended)
return kafka.NewUnretriableError(errors.New("validation failed"))
return kafka.NewOmittedError(errors.New("duplicate message"))

// Option 2: Implement interfaces on custom types
type ValidationError struct { Field, Message string }
func (e ValidationError) Error() string       { return e.Message }
func (e ValidationError) IsUnretriable() bool { return true }

New sentinel errors for topic collision detection:

  • ErrRetryTopicCollision — returned when a handler's retry topic matches its consumed topic
  • ErrDeadletterTopicCollision — returned when a handler's deadletter topic matches its consumed topic

Guaranteed Retry/Deadletter Forwarding

Message forwarding to retry/deadletter topics is now guaranteed: on producer failure, forwardWithRetry retries with exponential backoff (capped by ForwardMaxBackoffDuration, default: 30s) until the message is published or the context is cancelled. Messages are no longer silently lost on transient producer errors.

Native Sarama Exponential Backoff

Replaced custom exponential backoff with sarama.NewExponentialBackoff (KIP-580):

  • Includes jitter to avoid thundering herd
  • MaxBackoffDuration (default: 10 minutes) caps both fixed and exponential backoff
  • ExponentialBackoffFunc evaluated lazily — nil by default, initialized on first use
  • Per-handler BackoffFunc option available

Priority order for backoff calculation:

  1. Handler's BackoffFunc (per-handler)
  2. Global ExponentialBackoffFunc (if set)
  3. Lazy sarama.NewExponentialBackoff(DurationBeforeRetry, MaxBackoffDuration)

Observability Modernization (Tracing)

  • Replaced OpenTracing with OpenTelemetry, the current industry standard
  • Standard helpers to inject and extract trace context from Kafka headers using OTel propagators
  • Retry/deadletter trace propagation: forwarded messages now carry the current processing span's trace context (not the original producer's), giving a complete trace: producer → consumer (failure) → retry/deadletter

Bug Fixes

  • Partition instrumentation: Fixed string(msg.Partition)strconv.FormatInt that produced incorrect Unicode characters instead of numeric partition IDs
  • Consumer error channel: Fixed goroutine leak — error channel goroutine now exits on Close() via dedicated done channel (select on <-done), instead of blocking indefinitely
  • Retry/deadletter headers: Forwarded messages now correctly preserve original application headers while injecting the current trace context
  • Graceful shutdown: Fixed blocking during retry backoff when context is cancelled — uses time.NewTimer + select on ctx.Done() instead of leaking time.After
  • Tracing nil message: Added nil check for msg in extractCarrierFromMessage to prevent panics
  • Linter fixes: fmt.Sprintfstrconv.Itoa (perfsprint), wrapped static errors instead of dynamic errors (err113)

Improvements

  • Logging: No wrapper functions — library uses slog.Default() directly. Internal logs use enriched loggerFromContext(ctx) for consistent Kafka metadata. Client configures via slog.SetDefault() and optionally WithLogContextStorer for per-message context in handlers
  • Panic recovery: Added debug.Stack() for full stack traces in error logs, isolated into safeProcess
  • Client initialization: Replaced custom Mutex with sync.Once for thread-safe, idiomatic singleton pattern. Added resetClient() for test isolation
  • Metrics initialization: Replaced double-checked locking with sync.Once for all Prometheus metric registrations (consumer and producer)
  • Retry loop: Rewrote handleMessageWithRetry from recursive to iterative, eliminating stack overflow risk with InfiniteRetries. Extracted retryDuration function for backoff calculation
  • Producer context: Produce(ctx, msg) now accepts context, enabling trace propagation and context-aware logging through the producer middleware chain
  • Code structure: Refactored onNewMessage into smaller functions (enrichContext, startMessageSpan, processMessage) for readability. Extracted startMessageSpan to tracing.go

Migration Guide

Error Handling

Before (v3):

return fmt.Errorf("error: %w", kafka.ErrEventUnretriable)

After (v4):

// Recommended
return kafka.NewUnretriableError(err)

// Or implement the interface
type MyError struct{}
func (e MyError) Error() string       { return "my error" }
func (e MyError) IsUnretriable() bool { return true }

Logging

Before (v3):

kafka.Logger = myLogger
kafka.ErrorLogger = myErrorLogger

After (v4):

slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
    Level: slog.LevelInfo,
})))

// Optionally, enrich handler context with Kafka metadata
listener, _ := kafka.NewListener(appName, handlers,
    kafka.WithLogContextStorer(myToContext),
)

Producer

Before (v3):

producer.Produce(msg)

After (v4):

producer.Produce(ctx, msg)

Tracing

Before (v3):

import "github.com/ricardo-ch/go-tracing"
// OpenTracing-based tracing

After (v4):

// Configure OpenTelemetry in your application
listener, _ := kafka.NewListener(appName, handlers,
    kafka.WithTracing(kafka.DefaultTracing),
)

// Propagate trace context when producing
headers := kafka.GetKafkaHeadersFromContext(ctx)

Exponential Backoff

Before (v3):

kafka.ExponentialBackoffFunc = sarama.NewExponentialBackoff(...)

After (v4):

// No need to set ExponentialBackoffFunc — it is evaluated lazily using
// DurationBeforeRetry and MaxBackoffDuration. Just configure those:
kafka.DurationBeforeRetry = 2 * time.Second
kafka.MaxBackoffDuration = 10 * time.Minute

// Or ...
Read more