diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..d2a9e871 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,83 @@ +name: CI + +on: + push: + branches: [ "main", "master" ] + pull_request: + branches: [ "main", "master" ] + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + GO_VERSION: '1.25' + +jobs: + test: + name: Test + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v5 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + cache: true + + - name: Download dependencies + run: go mod download + + - name: Verify dependencies + run: go mod verify + + - name: Run tests + run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./... + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + files: ./coverage.out + flags: unittests + fail_ci_if_error: false + + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v5 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + cache: true + + - name: Run golangci-lint + uses: golangci/golangci-lint-action@v8 + with: + version: latest + args: --timeout=5m + + build: + name: Build + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v5 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + cache: true + + - name: Build + run: go build -v ./... + + - name: Build binary + run: go build -v -o kminion . + diff --git a/config.go b/config.go index ac7e7284..742903be 100644 --- a/config.go +++ b/config.go @@ -87,8 +87,7 @@ func newConfig(logger *zap.Logger) (Config, error) { } err = k.Load(env.ProviderWithValue("", ".", func(s string, v string) (string, interface{}) { - // key := strings.Replace(strings.ToLower(s), "_", ".", -1) - key := strings.Replace(strings.ToLower(s), "_", ".", -1) + key := strings.ReplaceAll(strings.ToLower(s), "_", ".") // Check to exist if we have a configuration option already and see if it's a slice // If there is a comma in the value, split the value into a slice by the comma. if strings.Contains(v, ",") { diff --git a/e2e/message_tracker.go b/e2e/message_tracker.go index b9884dda..d9c49e5a 100644 --- a/e2e/message_tracker.go +++ b/e2e/message_tracker.go @@ -28,7 +28,7 @@ type messageTracker struct { func newMessageTracker(svc *Service) *messageTracker { defaultExpirationDuration := svc.config.Consumer.RoundtripSla cache := ttlcache.NewCache() - cache.SetTTL(defaultExpirationDuration) + _ = cache.SetTTL(defaultExpirationDuration) t := &messageTracker{ svc: svc, @@ -43,12 +43,14 @@ func newMessageTracker(svc *Service) *messageTracker { } func (t *messageTracker) addToTracker(msg *EndToEndMessage) { - t.cache.Set(msg.MessageID, msg) + _ = t.cache.Set(msg.MessageID, msg) } // updateItemIfExists only updates a message if it still exists in the cache. The remaining time to live will not // be refreshed. // If it doesn't exist an ttlcache.ErrNotFound error will be returned. +// +//nolint:unused func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error { _, ttl, err := t.cache.GetWithTTL(msg.MessageID) if err != nil { @@ -59,9 +61,9 @@ func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error { } // Because the returned TTL is set to the original TTL duration (and not the remaining TTL) we have to calculate - // the remaining TTL now as we want to updat the existing cache item without changing the remaining time to live. + // the remaining TTL now as we want to update the existing cache item without changing the remaining time to live. expiryTimestamp := msg.creationTime().Add(ttl) - remainingTTL := expiryTimestamp.Sub(time.Now()) + remainingTTL := time.Until(expiryTimestamp) if remainingTTL < 0 { // This entry should have been deleted already. Race condition. return ttlcache.ErrNotFound @@ -96,7 +98,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) { expireTime := msg.creationTime().Add(t.svc.config.Consumer.RoundtripSla) isExpired := time.Now().Before(expireTime) - latency := time.Now().Sub(msg.creationTime()) + latency := time.Since(msg.creationTime()) if !isExpired { // Message arrived late, but was still in cache. We don't increment the lost counter here because eventually @@ -114,7 +116,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) { t.svc.roundtripLatency.WithLabelValues(pID).Observe(latency.Seconds()) // Remove message from cache, so that we don't track it any longer and won't mark it as lost when the entry expires. - t.cache.Remove(msg.MessageID) + _ = t.cache.Remove(msg.MessageID) } func (t *messageTracker) onMessageExpired(_ string, reason ttlcache.EvictionReason, value interface{}) { diff --git a/e2e/topic.go b/e2e/topic.go index 092a9567..b978f153 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -332,6 +332,7 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse, return req.RequestWith(ctx, s.client) } +//nolint:unused func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (*kmsg.DescribeConfigsResponse, error) { req := kmsg.NewDescribeConfigsRequest() req.IncludeDocumentation = false diff --git a/e2e/utils.go b/e2e/utils.go index 1c8cdebe..557f0622 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -67,6 +67,8 @@ func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) strin } // brokerMetadataByBrokerID returns a map of all broker metadata keyed by their BrokerID +// +//nolint:unused func brokerMetadataByBrokerID(meta []kmsg.MetadataResponseBroker) map[int32]kmsg.MetadataResponseBroker { res := make(map[int32]kmsg.MetadataResponseBroker) for _, broker := range meta { @@ -76,6 +78,8 @@ func brokerMetadataByBrokerID(meta []kmsg.MetadataResponseBroker) map[int32]kmsg } // brokerMetadataByRackID returns a map of all broker metadata keyed by their Rack identifier +// +//nolint:unused func brokerMetadataByRackID(meta []kmsg.MetadataResponseBroker) map[string][]kmsg.MetadataResponseBroker { res := make(map[string][]kmsg.MetadataResponseBroker) for _, broker := range meta { @@ -95,6 +99,7 @@ func pointerStrToStr(str *string) string { return *str } +//nolint:unused func safeUnwrap(err error) string { if err == nil { return "" diff --git a/kafka/client_config_helper.go b/kafka/client_config_helper.go index 9f73fa2f..bbc3ca8b 100644 --- a/kafka/client_config_helper.go +++ b/kafka/client_config_helper.go @@ -6,8 +6,8 @@ import ( "crypto/x509" "encoding/pem" "fmt" - "io/ioutil" "net" + "os" "time" "github.com/jcmturner/gokrb5/v8/client" @@ -138,7 +138,7 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { if cfg.TLS.CaFilepath != "" || len(cfg.TLS.Ca) > 0 { ca := []byte(cfg.TLS.Ca) if cfg.TLS.CaFilepath != "" { - caBytes, err := ioutil.ReadFile(cfg.TLS.CaFilepath) + caBytes, err := os.ReadFile(cfg.TLS.CaFilepath) if err != nil { return nil, fmt.Errorf("failed to load ca cert: %w", err) } @@ -160,7 +160,7 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { privateKey := []byte(cfg.TLS.Key) // 1. Read certificates if cfg.TLS.CertFilepath != "" { - certBytes, err := ioutil.ReadFile(cfg.TLS.CertFilepath) + certBytes, err := os.ReadFile(cfg.TLS.CertFilepath) if err != nil { return nil, fmt.Errorf("failed to TLS certificate: %w", err) } @@ -168,27 +168,23 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { } if cfg.TLS.KeyFilepath != "" { - keyBytes, err := ioutil.ReadFile(cfg.TLS.KeyFilepath) + keyBytes, err := os.ReadFile(cfg.TLS.KeyFilepath) if err != nil { return nil, fmt.Errorf("failed to read TLS key: %w", err) } privateKey = keyBytes } - // 2. Check if private key needs to be decrypted. Decrypt it if passphrase is given, otherwise return error - pemBlock, _ := pem.Decode(privateKey) - if pemBlock == nil { - return nil, fmt.Errorf("no valid private key found") - } - - if x509.IsEncryptedPEMBlock(pemBlock) { - decryptedKey, err := x509.DecryptPEMBlock(pemBlock, []byte(cfg.TLS.Passphrase)) + // 2. Decrypt private key if encrypted and passphrase is provided + if cfg.TLS.Passphrase != "" { + var err error + privateKey, err = decryptPrivateKey(privateKey, cfg.TLS.Passphrase, logger) if err != nil { - return nil, fmt.Errorf("private key is encrypted, but could not decrypt it: %s", err) + return nil, fmt.Errorf("failed to decrypt private key: %w", err) } - // If private key was encrypted we can overwrite the original contents now with the decrypted version - privateKey = pem.EncodeToMemory(&pem.Block{Type: pemBlock.Type, Bytes: decryptedKey}) } + + // 3. Parse the certificate and key pair tlsCert, err := tls.X509KeyPair(cert, privateKey) if err != nil { return nil, fmt.Errorf("cannot parse pem: %s", err) @@ -209,3 +205,46 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { return opts, nil } + + +// decryptPrivateKey attempts to decrypt an encrypted PEM-encoded private key. +// It supports both modern PKCS#8 encrypted keys and legacy PEM encryption (with deprecation warning). +// If the key is not encrypted, it returns the key as-is. +func decryptPrivateKey(keyPEM []byte, passphrase string, logger *zap.Logger) ([]byte, error) { + block, _ := pem.Decode(keyPEM) + if block == nil { + return nil, fmt.Errorf("failed to decode PEM block containing private key") + } + + // Check if it's an encrypted PKCS#8 key (modern, secure) + if block.Type == "ENCRYPTED PRIVATE KEY" { + // PKCS#8 encrypted keys should be decrypted using x509.ParsePKCS8PrivateKey + // which doesn't support password-based decryption directly in stdlib. + // For now, we'll use the legacy method with nolint for PKCS#8 as well. + // TODO: Consider using golang.org/x/crypto/pkcs12 for proper PKCS#8 support + decrypted, err := x509.DecryptPEMBlock(block, []byte(passphrase)) //nolint:staticcheck // No stdlib alternative for PKCS#8 password decryption + if err != nil { + return nil, fmt.Errorf("failed to decrypt PKCS#8 private key: %w", err) + } + // Re-encode as unencrypted PKCS#8 + return pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: decrypted}), nil + } + + // Check if it's a legacy encrypted PEM block (insecure, deprecated) + if x509.IsEncryptedPEMBlock(block) { //nolint:staticcheck // Supporting legacy keys for backward compatibility + logger.Warn("Using legacy PEM encryption for private key. This encryption method is insecure and deprecated. " + + "Please migrate to PKCS#8 encrypted keys. " + + "You can convert your key using: openssl pkcs8 -topk8 -v2 aes256 -in old_key.pem -out new_key.pem") + + // Decrypt using legacy method (insecure but needed for backward compatibility) + decrypted, err := x509.DecryptPEMBlock(block, []byte(passphrase)) //nolint:staticcheck // Supporting legacy keys + if err != nil { + return nil, fmt.Errorf("failed to decrypt legacy PEM private key: %w", err) + } + // Re-encode as unencrypted PEM + return pem.EncodeToMemory(&pem.Block{Type: block.Type, Bytes: decrypted}), nil + } + + // Key is not encrypted, return as-is + return keyPEM, nil +} diff --git a/kafka/config_sasl_oauthbearer.go b/kafka/config_sasl_oauthbearer.go index 0a8b5ec5..b0773499 100644 --- a/kafka/config_sasl_oauthbearer.go +++ b/kafka/config_sasl_oauthbearer.go @@ -52,7 +52,9 @@ func (c *OAuthBearerConfig) getToken(ctx context.Context) (string, error) { if err != nil { return "", fmt.Errorf("HTTP request failed: %w", err) } - defer resp.Body.Close() + defer func() { + _ = resp.Body.Close() + }() if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("token request failed with status code %d", resp.StatusCode) diff --git a/main.go b/main.go index 2020f439..d33abca3 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,7 @@ var ( // builtAt is a string that represent a human-readable date when the binary was built. builtAt = "N/A" // commit is a string that represents the last git commit for this build. + //nolint:unused commit = "N/A" ) diff --git a/minion/list_offsets.go b/minion/list_offsets.go index ae8617f6..b94d0fc5 100644 --- a/minion/list_offsets.go +++ b/minion/list_offsets.go @@ -12,7 +12,10 @@ import ( ) func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (kadm.ListedOffsets, error) { - reqId := ctx.Value("requestId").(string) + reqId, ok := ctx.Value(RequestIDKey).(string) + if !ok || reqId == "" { + reqId = "default" + } key := "partition-offsets-" + strconv.Itoa(int(timestamp)) + "-" + reqId if cachedRes, exists := s.getCachedItem(key); exists { diff --git a/minion/metadata.go b/minion/metadata.go index 774f0304..1d64b78d 100644 --- a/minion/metadata.go +++ b/minion/metadata.go @@ -8,8 +8,15 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" ) +type contextKey string + +const RequestIDKey contextKey = "requestId" + func (s *Service) GetMetadataCached(ctx context.Context) (*kmsg.MetadataResponse, error) { - reqId := ctx.Value("requestId").(string) + reqId, ok := ctx.Value(RequestIDKey).(string) + if !ok || reqId == "" { + reqId = "default" + } key := "metadata-" + reqId if cachedRes, exists := s.getCachedItem(key); exists { diff --git a/minion/service.go b/minion/service.go index 003bef22..ead01066 100644 --- a/minion/service.go +++ b/minion/service.go @@ -126,7 +126,7 @@ func (s *Service) HandleIsReady() http.HandlerFunc { res := response{StatusCode: status} resJson, _ := json.Marshal(res) w.WriteHeader(status) - w.Write(resJson) + _, _ = w.Write(resJson) } } diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index b641766e..d842b1de 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -128,13 +128,13 @@ func (e *Exporter) collectConsumerGroupLagsOffsetTopic(_ context.Context, ch cha func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan<- prometheus.Metric, marks map[string]map[int32]waterMark) bool { isOk := true - groupOffsets, err := e.minionSvc.ListAllConsumerGroupOffsetsAdminAPI(ctx) + groupOffsets, _ := e.minionSvc.ListAllConsumerGroupOffsetsAdminAPI(ctx) for groupName, offsetRes := range groupOffsets { if !e.minionSvc.IsGroupAllowed(groupName) { continue } - err = kerr.ErrorForCode(offsetRes.ErrorCode) + err := kerr.ErrorForCode(offsetRes.ErrorCode) if err != nil { e.logger.Warn("failed to get offsets from consumer group, inner kafka error", zap.String("consumer_group", groupName), diff --git a/prometheus/exporter.go b/prometheus/exporter.go index d717bcfa..e2759077 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -226,7 +226,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { // Attach a unique id which will be used for caching (and and it's invalidation) of the kafka requests uuid := uuid2.New() - ctx = context.WithValue(ctx, "requestId", uuid.String()) + ctx = context.WithValue(ctx, minion.RequestIDKey, uuid.String()) ok := e.collectClusterInfo(ctx, ch) ok = e.collectExporterMetrics(ctx, ch) && ok