Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
name: CI

on:
push:
branches: [ "main", "master" ]
pull_request:
branches: [ "main", "master" ]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

env:
GO_VERSION: '1.25'

jobs:
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v5

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
cache: true

- name: Download dependencies
run: go mod download

- name: Verify dependencies
run: go mod verify

- name: Run tests
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
files: ./coverage.out
flags: unittests
fail_ci_if_error: false

lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v5

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
cache: true

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v8
with:
version: latest
args: --timeout=5m

build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v5

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
cache: true

- name: Build
run: go build -v ./...

- name: Build binary
run: go build -v -o kminion .

3 changes: 1 addition & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func newConfig(logger *zap.Logger) (Config, error) {
}

err = k.Load(env.ProviderWithValue("", ".", func(s string, v string) (string, interface{}) {
// key := strings.Replace(strings.ToLower(s), "_", ".", -1)
key := strings.Replace(strings.ToLower(s), "_", ".", -1)
key := strings.ReplaceAll(strings.ToLower(s), "_", ".")
// Check to exist if we have a configuration option already and see if it's a slice
// If there is a comma in the value, split the value into a slice by the comma.
if strings.Contains(v, ",") {
Expand Down
14 changes: 8 additions & 6 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type messageTracker struct {
func newMessageTracker(svc *Service) *messageTracker {
defaultExpirationDuration := svc.config.Consumer.RoundtripSla
cache := ttlcache.NewCache()
cache.SetTTL(defaultExpirationDuration)
_ = cache.SetTTL(defaultExpirationDuration)

t := &messageTracker{
svc: svc,
Expand All @@ -43,12 +43,14 @@ func newMessageTracker(svc *Service) *messageTracker {
}

func (t *messageTracker) addToTracker(msg *EndToEndMessage) {
t.cache.Set(msg.MessageID, msg)
_ = t.cache.Set(msg.MessageID, msg)
}

// updateItemIfExists only updates a message if it still exists in the cache. The remaining time to live will not
// be refreshed.
// If it doesn't exist an ttlcache.ErrNotFound error will be returned.
//
//nolint:unused
func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error {
_, ttl, err := t.cache.GetWithTTL(msg.MessageID)
if err != nil {
Expand All @@ -59,9 +61,9 @@ func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error {
}

// Because the returned TTL is set to the original TTL duration (and not the remaining TTL) we have to calculate
// the remaining TTL now as we want to updat the existing cache item without changing the remaining time to live.
// the remaining TTL now as we want to update the existing cache item without changing the remaining time to live.
expiryTimestamp := msg.creationTime().Add(ttl)
remainingTTL := expiryTimestamp.Sub(time.Now())
remainingTTL := time.Until(expiryTimestamp)
if remainingTTL < 0 {
// This entry should have been deleted already. Race condition.
return ttlcache.ErrNotFound
Expand Down Expand Up @@ -96,7 +98,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {

expireTime := msg.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
isExpired := time.Now().Before(expireTime)
latency := time.Now().Sub(msg.creationTime())
latency := time.Since(msg.creationTime())

if !isExpired {
// Message arrived late, but was still in cache. We don't increment the lost counter here because eventually
Expand All @@ -114,7 +116,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
t.svc.roundtripLatency.WithLabelValues(pID).Observe(latency.Seconds())

// Remove message from cache, so that we don't track it any longer and won't mark it as lost when the entry expires.
t.cache.Remove(msg.MessageID)
_ = t.cache.Remove(msg.MessageID)
}

func (t *messageTracker) onMessageExpired(_ string, reason ttlcache.EvictionReason, value interface{}) {
Expand Down
1 change: 1 addition & 0 deletions e2e/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse,
return req.RequestWith(ctx, s.client)
}

//nolint:unused
func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (*kmsg.DescribeConfigsResponse, error) {
req := kmsg.NewDescribeConfigsRequest()
req.IncludeDocumentation = false
Expand Down
5 changes: 5 additions & 0 deletions e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) strin
}

// brokerMetadataByBrokerID returns a map of all broker metadata keyed by their BrokerID
//
//nolint:unused
func brokerMetadataByBrokerID(meta []kmsg.MetadataResponseBroker) map[int32]kmsg.MetadataResponseBroker {
res := make(map[int32]kmsg.MetadataResponseBroker)
for _, broker := range meta {
Expand All @@ -76,6 +78,8 @@ func brokerMetadataByBrokerID(meta []kmsg.MetadataResponseBroker) map[int32]kmsg
}

// brokerMetadataByRackID returns a map of all broker metadata keyed by their Rack identifier
//
//nolint:unused
func brokerMetadataByRackID(meta []kmsg.MetadataResponseBroker) map[string][]kmsg.MetadataResponseBroker {
res := make(map[string][]kmsg.MetadataResponseBroker)
for _, broker := range meta {
Expand All @@ -95,6 +99,7 @@ func pointerStrToStr(str *string) string {
return *str
}

//nolint:unused
func safeUnwrap(err error) string {
if err == nil {
return "<nil>"
Expand Down
69 changes: 54 additions & 15 deletions kafka/client_config_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"io/ioutil"
"net"
"os"
"time"

"github.com/jcmturner/gokrb5/v8/client"
Expand Down Expand Up @@ -138,7 +138,7 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
if cfg.TLS.CaFilepath != "" || len(cfg.TLS.Ca) > 0 {
ca := []byte(cfg.TLS.Ca)
if cfg.TLS.CaFilepath != "" {
caBytes, err := ioutil.ReadFile(cfg.TLS.CaFilepath)
caBytes, err := os.ReadFile(cfg.TLS.CaFilepath)
if err != nil {
return nil, fmt.Errorf("failed to load ca cert: %w", err)
}
Expand All @@ -160,35 +160,31 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
privateKey := []byte(cfg.TLS.Key)
// 1. Read certificates
if cfg.TLS.CertFilepath != "" {
certBytes, err := ioutil.ReadFile(cfg.TLS.CertFilepath)
certBytes, err := os.ReadFile(cfg.TLS.CertFilepath)
if err != nil {
return nil, fmt.Errorf("failed to TLS certificate: %w", err)
}
cert = certBytes
}

if cfg.TLS.KeyFilepath != "" {
keyBytes, err := ioutil.ReadFile(cfg.TLS.KeyFilepath)
keyBytes, err := os.ReadFile(cfg.TLS.KeyFilepath)
if err != nil {
return nil, fmt.Errorf("failed to read TLS key: %w", err)
}
privateKey = keyBytes
}

// 2. Check if private key needs to be decrypted. Decrypt it if passphrase is given, otherwise return error
pemBlock, _ := pem.Decode(privateKey)
if pemBlock == nil {
return nil, fmt.Errorf("no valid private key found")
}

if x509.IsEncryptedPEMBlock(pemBlock) {
decryptedKey, err := x509.DecryptPEMBlock(pemBlock, []byte(cfg.TLS.Passphrase))
// 2. Decrypt private key if encrypted and passphrase is provided
if cfg.TLS.Passphrase != "" {
var err error
privateKey, err = decryptPrivateKey(privateKey, cfg.TLS.Passphrase, logger)
if err != nil {
return nil, fmt.Errorf("private key is encrypted, but could not decrypt it: %s", err)
return nil, fmt.Errorf("failed to decrypt private key: %w", err)
}
// If private key was encrypted we can overwrite the original contents now with the decrypted version
privateKey = pem.EncodeToMemory(&pem.Block{Type: pemBlock.Type, Bytes: decryptedKey})
}

// 3. Parse the certificate and key pair
tlsCert, err := tls.X509KeyPair(cert, privateKey)
if err != nil {
return nil, fmt.Errorf("cannot parse pem: %s", err)
Expand All @@ -209,3 +205,46 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {

return opts, nil
}


// decryptPrivateKey attempts to decrypt an encrypted PEM-encoded private key.
// It supports both modern PKCS#8 encrypted keys and legacy PEM encryption (with deprecation warning).
// If the key is not encrypted, it returns the key as-is.
func decryptPrivateKey(keyPEM []byte, passphrase string, logger *zap.Logger) ([]byte, error) {
block, _ := pem.Decode(keyPEM)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block containing private key")
}

// Check if it's an encrypted PKCS#8 key (modern, secure)
if block.Type == "ENCRYPTED PRIVATE KEY" {
// PKCS#8 encrypted keys should be decrypted using x509.ParsePKCS8PrivateKey
// which doesn't support password-based decryption directly in stdlib.
// For now, we'll use the legacy method with nolint for PKCS#8 as well.
// TODO: Consider using golang.org/x/crypto/pkcs12 for proper PKCS#8 support
decrypted, err := x509.DecryptPEMBlock(block, []byte(passphrase)) //nolint:staticcheck // No stdlib alternative for PKCS#8 password decryption
if err != nil {
return nil, fmt.Errorf("failed to decrypt PKCS#8 private key: %w", err)
}
// Re-encode as unencrypted PKCS#8
return pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: decrypted}), nil
}

// Check if it's a legacy encrypted PEM block (insecure, deprecated)
if x509.IsEncryptedPEMBlock(block) { //nolint:staticcheck // Supporting legacy keys for backward compatibility
logger.Warn("Using legacy PEM encryption for private key. This encryption method is insecure and deprecated. " +
"Please migrate to PKCS#8 encrypted keys. " +
"You can convert your key using: openssl pkcs8 -topk8 -v2 aes256 -in old_key.pem -out new_key.pem")

// Decrypt using legacy method (insecure but needed for backward compatibility)
decrypted, err := x509.DecryptPEMBlock(block, []byte(passphrase)) //nolint:staticcheck // Supporting legacy keys
if err != nil {
return nil, fmt.Errorf("failed to decrypt legacy PEM private key: %w", err)
}
// Re-encode as unencrypted PEM
return pem.EncodeToMemory(&pem.Block{Type: block.Type, Bytes: decrypted}), nil
}

// Key is not encrypted, return as-is
return keyPEM, nil
}
4 changes: 3 additions & 1 deletion kafka/config_sasl_oauthbearer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func (c *OAuthBearerConfig) getToken(ctx context.Context) (string, error) {
if err != nil {
return "", fmt.Errorf("HTTP request failed: %w", err)
}
defer resp.Body.Close()
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("token request failed with status code %d", resp.StatusCode)
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
// builtAt is a string that represent a human-readable date when the binary was built.
builtAt = "N/A"
// commit is a string that represents the last git commit for this build.
//nolint:unused
commit = "N/A"
)

Expand Down
5 changes: 4 additions & 1 deletion minion/list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
)

func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) {
reqId := ctx.Value("requestId").(string)
reqId, ok := ctx.Value(RequestIDKey).(string)
if !ok || reqId == "" {
reqId = "default"
}
key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId

if cachedRes, exists := s.getCachedItem(key); exists {
Expand Down
9 changes: 8 additions & 1 deletion minion/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@ import (
"github.com/twmb/franz-go/pkg/kmsg"
)

type contextKey string

const RequestIDKey contextKey = "requestId"

func (s *Service) GetMetadataCached(ctx context.Context) (*kmsg.MetadataResponse, error) {
reqId := ctx.Value("requestId").(string)
reqId, ok := ctx.Value(RequestIDKey).(string)
if !ok || reqId == "" {
reqId = "default"
}
key := "metadata-" + reqId

if cachedRes, exists := s.getCachedItem(key); exists {
Expand Down
2 changes: 1 addition & 1 deletion minion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *Service) HandleIsReady() http.HandlerFunc {
res := response{StatusCode: status}
resJson, _ := json.Marshal(res)
w.WriteHeader(status)
w.Write(resJson)
_, _ = w.Write(resJson)
}
}

Expand Down
4 changes: 2 additions & 2 deletions prometheus/collect_consumer_group_lags.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ func (e *Exporter) collectConsumerGroupLagsOffsetTopic(_ context.Context, ch cha
func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan<- prometheus.Metric, marks map[string]map[int32]waterMark) bool {
isOk := true

groupOffsets, err := e.minionSvc.ListAllConsumerGroupOffsetsAdminAPI(ctx)
groupOffsets, _ := e.minionSvc.ListAllConsumerGroupOffsetsAdminAPI(ctx)
for groupName, offsetRes := range groupOffsets {
if !e.minionSvc.IsGroupAllowed(groupName) {
continue
}

err = kerr.ErrorForCode(offsetRes.ErrorCode)
err := kerr.ErrorForCode(offsetRes.ErrorCode)
if err != nil {
e.logger.Warn("failed to get offsets from consumer group, inner kafka error",
zap.String("consumer_group", groupName),
Expand Down
2 changes: 1 addition & 1 deletion prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {

// Attach a unique id which will be used for caching (and and it's invalidation) of the kafka requests
uuid := uuid2.New()
ctx = context.WithValue(ctx, "requestId", uuid.String())
ctx = context.WithValue(ctx, minion.RequestIDKey, uuid.String())

ok := e.collectClusterInfo(ctx, ch)
ok = e.collectExporterMetrics(ctx, ch) && ok
Expand Down