Skip to content

Commit ebc07a2

Browse files
author
amuraru
committed
Add unit tests and golangci lint CI check
1 parent cdf025f commit ebc07a2

File tree

11 files changed

+164
-28
lines changed

11 files changed

+164
-28
lines changed

.github/workflows/ci.yaml

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: [ 'main', 'master' ]
6+
pull_request:
7+
branches: [ 'main', 'master' ]
8+
9+
concurrency:
10+
group: ${{ github.workflow }}-${{ github.ref }}
11+
cancel-in-progress: true
12+
13+
env:
14+
GO_VERSION: '1.25'
15+
16+
jobs:
17+
test:
18+
name: Test
19+
runs-on: ubuntu-latest
20+
steps:
21+
- name: Checkout code
22+
uses: actions/checkout@v5
23+
24+
- name: Set up Go
25+
uses: actions/setup-go@v5
26+
with:
27+
go-version: ${{ env.GO_VERSION }}
28+
cache: true
29+
30+
- name: Download dependencies
31+
run: go mod download
32+
33+
- name: Verify dependencies
34+
run: go mod verify
35+
36+
- name: Run tests
37+
run: go test -v -race -coverprofile=coverage.out -covermode=atomic ./...
38+
39+
- name: Upload coverage to Codecov
40+
uses: codecov/codecov-action@v4
41+
with:
42+
files: ./coverage.out
43+
flags: unittests
44+
fail_ci_if_error: false
45+
46+
lint:
47+
name: Lint
48+
runs-on: ubuntu-latest
49+
steps:
50+
- name: Checkout code
51+
uses: actions/checkout@v5
52+
53+
- name: Set up Go
54+
uses: actions/setup-go@v5
55+
with:
56+
go-version: ${{ env.GO_VERSION }}
57+
cache: true
58+
59+
- name: Run golangci-lint
60+
uses: golangci/golangci-lint-action@v8
61+
with:
62+
version: latest
63+
args: --timeout=5m
64+
65+
build:
66+
name: Build
67+
runs-on: ubuntu-latest
68+
steps:
69+
- name: Checkout code
70+
uses: actions/checkout@v5
71+
72+
- name: Set up Go
73+
uses: actions/setup-go@v5
74+
with:
75+
go-version: ${{ env.GO_VERSION }}
76+
cache: true
77+
78+
- name: Build
79+
run: go build -v ./...
80+
81+
- name: Build binary
82+
run: go build -v -o kminion .
83+

config.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ func newConfig(logger *zap.Logger) (Config, error) {
8787
}
8888

8989
err = k.Load(env.ProviderWithValue("", ".", func(s string, v string) (string, interface{}) {
90-
// key := strings.Replace(strings.ToLower(s), "_", ".", -1)
91-
key := strings.Replace(strings.ToLower(s), "_", ".", -1)
90+
key := strings.ReplaceAll(strings.ToLower(s), "_", ".")
9291
// Check to exist if we have a configuration option already and see if it's a slice
9392
// If there is a comma in the value, split the value into a slice by the comma.
9493
if strings.Contains(v, ",") {

e2e/message_tracker.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type messageTracker struct {
2828
func newMessageTracker(svc *Service) *messageTracker {
2929
defaultExpirationDuration := svc.config.Consumer.RoundtripSla
3030
cache := ttlcache.NewCache()
31-
cache.SetTTL(defaultExpirationDuration)
31+
_ = cache.SetTTL(defaultExpirationDuration)
3232

3333
t := &messageTracker{
3434
svc: svc,
@@ -43,12 +43,14 @@ func newMessageTracker(svc *Service) *messageTracker {
4343
}
4444

4545
func (t *messageTracker) addToTracker(msg *EndToEndMessage) {
46-
t.cache.Set(msg.MessageID, msg)
46+
_ = t.cache.Set(msg.MessageID, msg)
4747
}
4848

4949
// updateItemIfExists only updates a message if it still exists in the cache. The remaining time to live will not
5050
// be refreshed.
5151
// If it doesn't exist an ttlcache.ErrNotFound error will be returned.
52+
//
53+
//nolint:unused
5254
func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error {
5355
_, ttl, err := t.cache.GetWithTTL(msg.MessageID)
5456
if err != nil {
@@ -59,9 +61,9 @@ func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error {
5961
}
6062

6163
// Because the returned TTL is set to the original TTL duration (and not the remaining TTL) we have to calculate
62-
// the remaining TTL now as we want to updat the existing cache item without changing the remaining time to live.
64+
// the remaining TTL now as we want to update the existing cache item without changing the remaining time to live.
6365
expiryTimestamp := msg.creationTime().Add(ttl)
64-
remainingTTL := expiryTimestamp.Sub(time.Now())
66+
remainingTTL := time.Until(expiryTimestamp)
6567
if remainingTTL < 0 {
6668
// This entry should have been deleted already. Race condition.
6769
return ttlcache.ErrNotFound
@@ -96,7 +98,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
9698

9799
expireTime := msg.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
98100
isExpired := time.Now().Before(expireTime)
99-
latency := time.Now().Sub(msg.creationTime())
101+
latency := time.Since(msg.creationTime())
100102

101103
if !isExpired {
102104
// Message arrived late, but was still in cache. We don't increment the lost counter here because eventually
@@ -114,7 +116,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
114116
t.svc.roundtripLatency.WithLabelValues(pID).Observe(latency.Seconds())
115117

116118
// Remove message from cache, so that we don't track it any longer and won't mark it as lost when the entry expires.
117-
t.cache.Remove(msg.MessageID)
119+
_ = t.cache.Remove(msg.MessageID)
118120
}
119121

120122
func (t *messageTracker) onMessageExpired(_ string, reason ttlcache.EvictionReason, value interface{}) {

e2e/topic.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse,
332332
return req.RequestWith(ctx, s.client)
333333
}
334334

335+
//nolint:unused
335336
func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (*kmsg.DescribeConfigsResponse, error) {
336337
req := kmsg.NewDescribeConfigsRequest()
337338
req.IncludeDocumentation = false

e2e/utils.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) strin
6767
}
6868

6969
// brokerMetadataByBrokerID returns a map of all broker metadata keyed by their BrokerID
70+
//
71+
//nolint:unused
7072
func brokerMetadataByBrokerID(meta []kmsg.MetadataResponseBroker) map[int32]kmsg.MetadataResponseBroker {
7173
res := make(map[int32]kmsg.MetadataResponseBroker)
7274
for _, broker := range meta {
@@ -76,6 +78,8 @@ func brokerMetadataByBrokerID(meta []kmsg.MetadataResponseBroker) map[int32]kmsg
7678
}
7779

7880
// brokerMetadataByRackID returns a map of all broker metadata keyed by their Rack identifier
81+
//
82+
//nolint:unused
7983
func brokerMetadataByRackID(meta []kmsg.MetadataResponseBroker) map[string][]kmsg.MetadataResponseBroker {
8084
res := make(map[string][]kmsg.MetadataResponseBroker)
8185
for _, broker := range meta {
@@ -95,6 +99,7 @@ func pointerStrToStr(str *string) string {
9599
return *str
96100
}
97101

102+
//nolint:unused
98103
func safeUnwrap(err error) string {
99104
if err == nil {
100105
return "<nil>"

kafka/client_config_helper.go

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"crypto/x509"
77
"encoding/pem"
88
"fmt"
9-
"io/ioutil"
109
"net"
10+
"os"
1111
"time"
1212

1313
"github.com/jcmturner/gokrb5/v8/client"
@@ -138,7 +138,7 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
138138
if cfg.TLS.CaFilepath != "" || len(cfg.TLS.Ca) > 0 {
139139
ca := []byte(cfg.TLS.Ca)
140140
if cfg.TLS.CaFilepath != "" {
141-
caBytes, err := ioutil.ReadFile(cfg.TLS.CaFilepath)
141+
caBytes, err := os.ReadFile(cfg.TLS.CaFilepath)
142142
if err != nil {
143143
return nil, fmt.Errorf("failed to load ca cert: %w", err)
144144
}
@@ -160,35 +160,31 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
160160
privateKey := []byte(cfg.TLS.Key)
161161
// 1. Read certificates
162162
if cfg.TLS.CertFilepath != "" {
163-
certBytes, err := ioutil.ReadFile(cfg.TLS.CertFilepath)
163+
certBytes, err := os.ReadFile(cfg.TLS.CertFilepath)
164164
if err != nil {
165165
return nil, fmt.Errorf("failed to TLS certificate: %w", err)
166166
}
167167
cert = certBytes
168168
}
169169

170170
if cfg.TLS.KeyFilepath != "" {
171-
keyBytes, err := ioutil.ReadFile(cfg.TLS.KeyFilepath)
171+
keyBytes, err := os.ReadFile(cfg.TLS.KeyFilepath)
172172
if err != nil {
173173
return nil, fmt.Errorf("failed to read TLS key: %w", err)
174174
}
175175
privateKey = keyBytes
176176
}
177177

178-
// 2. Check if private key needs to be decrypted. Decrypt it if passphrase is given, otherwise return error
179-
pemBlock, _ := pem.Decode(privateKey)
180-
if pemBlock == nil {
181-
return nil, fmt.Errorf("no valid private key found")
182-
}
183-
184-
if x509.IsEncryptedPEMBlock(pemBlock) {
185-
decryptedKey, err := x509.DecryptPEMBlock(pemBlock, []byte(cfg.TLS.Passphrase))
178+
// 2. Decrypt private key if encrypted and passphrase is provided
179+
if cfg.TLS.Passphrase != "" {
180+
var err error
181+
privateKey, err = decryptPrivateKey(privateKey, cfg.TLS.Passphrase, logger)
186182
if err != nil {
187-
return nil, fmt.Errorf("private key is encrypted, but could not decrypt it: %s", err)
183+
return nil, fmt.Errorf("failed to decrypt private key: %w", err)
188184
}
189-
// If private key was encrypted we can overwrite the original contents now with the decrypted version
190-
privateKey = pem.EncodeToMemory(&pem.Block{Type: pemBlock.Type, Bytes: decryptedKey})
191185
}
186+
187+
// 3. Parse the certificate and key pair
192188
tlsCert, err := tls.X509KeyPair(cert, privateKey)
193189
if err != nil {
194190
return nil, fmt.Errorf("cannot parse pem: %s", err)
@@ -209,3 +205,46 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
209205

210206
return opts, nil
211207
}
208+
209+
210+
// decryptPrivateKey attempts to decrypt an encrypted PEM-encoded private key.
211+
// It supports both modern PKCS#8 encrypted keys and legacy PEM encryption (with deprecation warning).
212+
// If the key is not encrypted, it returns the key as-is.
213+
func decryptPrivateKey(keyPEM []byte, passphrase string, logger *zap.Logger) ([]byte, error) {
214+
block, _ := pem.Decode(keyPEM)
215+
if block == nil {
216+
return nil, fmt.Errorf("failed to decode PEM block containing private key")
217+
}
218+
219+
// Check if it's an encrypted PKCS#8 key (modern, secure)
220+
if block.Type == "ENCRYPTED PRIVATE KEY" {
221+
// PKCS#8 encrypted keys should be decrypted using x509.ParsePKCS8PrivateKey
222+
// which doesn't support password-based decryption directly in stdlib.
223+
// For now, we'll use the legacy method with nolint for PKCS#8 as well.
224+
// TODO: Consider using golang.org/x/crypto/pkcs12 for proper PKCS#8 support
225+
decrypted, err := x509.DecryptPEMBlock(block, []byte(passphrase)) //nolint:staticcheck // No stdlib alternative for PKCS#8 password decryption
226+
if err != nil {
227+
return nil, fmt.Errorf("failed to decrypt PKCS#8 private key: %w", err)
228+
}
229+
// Re-encode as unencrypted PKCS#8
230+
return pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: decrypted}), nil
231+
}
232+
233+
// Check if it's a legacy encrypted PEM block (insecure, deprecated)
234+
if x509.IsEncryptedPEMBlock(block) { //nolint:staticcheck // Supporting legacy keys for backward compatibility
235+
logger.Warn("Using legacy PEM encryption for private key. This encryption method is insecure and deprecated. " +
236+
"Please migrate to PKCS#8 encrypted keys. " +
237+
"You can convert your key using: openssl pkcs8 -topk8 -v2 aes256 -in old_key.pem -out new_key.pem")
238+
239+
// Decrypt using legacy method (insecure but needed for backward compatibility)
240+
decrypted, err := x509.DecryptPEMBlock(block, []byte(passphrase)) //nolint:staticcheck // Supporting legacy keys
241+
if err != nil {
242+
return nil, fmt.Errorf("failed to decrypt legacy PEM private key: %w", err)
243+
}
244+
// Re-encode as unencrypted PEM
245+
return pem.EncodeToMemory(&pem.Block{Type: block.Type, Bytes: decrypted}), nil
246+
}
247+
248+
// Key is not encrypted, return as-is
249+
return keyPEM, nil
250+
}

kafka/config_sasl_oauthbearer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ func (c *OAuthBearerConfig) getToken(ctx context.Context) (string, error) {
5252
if err != nil {
5353
return "", fmt.Errorf("HTTP request failed: %w", err)
5454
}
55-
defer resp.Body.Close()
55+
defer func() {
56+
_ = resp.Body.Close()
57+
}()
5658

5759
if resp.StatusCode != http.StatusOK {
5860
return "", fmt.Errorf("token request failed with status code %d", resp.StatusCode)

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var (
3030
// builtAt is a string that represent a human-readable date when the binary was built.
3131
builtAt = "N/A"
3232
// commit is a string that represents the last git commit for this build.
33+
//nolint:unused
3334
commit = "N/A"
3435
)
3536

minion/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *Service) HandleIsReady() http.HandlerFunc {
126126
res := response{StatusCode: status}
127127
resJson, _ := json.Marshal(res)
128128
w.WriteHeader(status)
129-
w.Write(resJson)
129+
_, _ = w.Write(resJson)
130130
}
131131
}
132132

prometheus/collect_consumer_group_lags.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,13 @@ func (e *Exporter) collectConsumerGroupLagsOffsetTopic(_ context.Context, ch cha
128128
func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan<- prometheus.Metric, marks map[string]map[int32]waterMark) bool {
129129
isOk := true
130130

131-
groupOffsets, err := e.minionSvc.ListAllConsumerGroupOffsetsAdminAPI(ctx)
131+
groupOffsets, _ := e.minionSvc.ListAllConsumerGroupOffsetsAdminAPI(ctx)
132132
for groupName, offsetRes := range groupOffsets {
133133
if !e.minionSvc.IsGroupAllowed(groupName) {
134134
continue
135135
}
136136

137-
err = kerr.ErrorForCode(offsetRes.ErrorCode)
137+
err := kerr.ErrorForCode(offsetRes.ErrorCode)
138138
if err != nil {
139139
e.logger.Warn("failed to get offsets from consumer group, inner kafka error",
140140
zap.String("consumer_group", groupName),

0 commit comments

Comments
 (0)