Skip to content

cert rotation + refactoring #196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Setup go
uses: actions/setup-go@v4
with:
go-version: '1.22'
go-version: '1.23'
check-latest: true
- run: go version
- name: Run build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Setup go
uses: actions/setup-go@v4
with:
go-version: '1.22'
go-version: '1.23'
check-latest: true
- run: go version
- name: Run build and test
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.21 AS builder
FROM --platform=$BUILDPLATFORM golang:1.23-alpine3.21 AS builder
RUN apk add alpine-sdk ca-certificates

ARG TARGETOS
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.all
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.21 AS builder
FROM --platform=$BUILDPLATFORM golang:1.23-alpine3.21 AS builder
RUN apk add alpine-sdk ca-certificates

ARG TARGETOS
Expand Down
15 changes: 12 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
GOPKGS = $(shell go list ./... | grep -v /vendor/)
BUILD_FLAGS ?=
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
TAG ?= "v0.3.12"
TAG ?= "v0.4.0"
GOOS ?= $(if $(TARGETOS),$(TARGETOS),linux)
GOARCH ?= $(if $(TARGETARCH),$(TARGETARCH),amd64)
GOARM ?= $(TARGETVARIANT)
Expand All @@ -22,6 +22,8 @@ PROTOC_VERSION ?= 22.2
PROTOC_BIN_DIR := .tools
PROTOC := $(PROTOC_BIN_DIR)/protoc

GOLANGCI_LINT = go run github.com/golangci/golangci-lint/cmd/[email protected]

default: build

test.race:
Expand All @@ -33,10 +35,17 @@ test:
fmt:
go fmt $(GOPKGS)

check:
golint $(GOPKGS)
check: lint-code
go vet $(GOPKGS)

.PHONY: lint-code
lint-code:
$(GOLANGCI_LINT) run --timeout 5m

.PHONY: lint-fix
lint-fix:
$(GOLANGCI_LINT) run --fix

.PHONY: build
build: build/$(BINARY)

Expand Down
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ As not every Kafka release adds new messages/versions which are relevant to the

Linux

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.12/kafka-proxy-v0.3.12-linux-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-linux-amd64.tar.gz | tar xz

macOS

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.12/kafka-proxy-v0.3.12-darwin-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.4.0/kafka-proxy-v0.4.0-darwin-amd64.tar.gz | tar xz

2. Move the binary in to your PATH.

Expand All @@ -70,7 +70,7 @@ Docker images are available on [Docker Hub](https://hub.docker.com/r/grepplabs/k

You can launch a kafka-proxy container for trying it out with

docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.12 \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0 \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand All @@ -89,7 +89,7 @@ Docker images with precompiled plugins located in `/opt/kafka-proxy/bin/` are ta

You can launch a kafka-proxy container with auth-ldap plugin for trying it out with

docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.12-all \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.4.0-all \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand Down Expand Up @@ -140,6 +140,7 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--debug-enable Enable Debug endpoint
--debug-listen-address string Debug listen address (default "0.0.0.0:6060")
--default-listener-ip string Default listener IP (default "0.0.0.0")
--deterministic-listeners Enable deterministic listeners (listener port = min port + broker id).
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
--dynamic-advertised-listener string Advertised address for dynamic listeners. If empty, default-listener-ip is used
--dynamic-listeners-disable Disable dynamic listeners.
Expand Down Expand Up @@ -170,26 +171,30 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--kafka-read-timeout duration How long to wait for a response (default 30s)
--kafka-write-timeout duration How long to wait for a transmit (default 30s)
--log-format string Log format text or json (default "text")
--log-level string Log level debug, info, warning, error, fatal or panic (default "info")
--log-level string Log level trace, debug, info, warning, error, fatal or panic (default "info")
--log-level-fieldname string Log level fieldname for json format (default "@level")
--log-msg-fieldname string Message fieldname for json format (default "@message")
--log-time-fieldname string Time fieldname for json format (default "@timestamp")
--producer-acks-0-disabled Assume fire-and-forget is never sent by the producer. Enabling this parameter will increase performance
--proxy-listener-ca-chain-cert-file string PEM encoded CA's certificate file. If provided, client certificate is required and verified
--proxy-listener-cert-file string PEM encoded file with server certificate
--proxy-listener-cipher-suites strings List of supported cipher suites
--proxy-listener-crl-file string PEM encoded X509 CRLs file
--proxy-listener-curve-preferences strings List of curve preferences
--proxy-listener-keep-alive duration Keep alive period for an active network connection. If zero, keep-alives are disabled (default 1m0s)
--proxy-listener-key-file string PEM encoded file with private key for the server certificate
--proxy-listener-key-password string Password to decrypt rsa private key
--proxy-listener-read-buffer-size int Size of the operating system's receive buffer associated with the connection. If zero, system default is used
--proxy-listener-tls-enable Whether or not to use TLS listener
--proxy-listener-tls-refresh duration Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled
--proxy-listener-tls-required-client-subject strings Required client certificate subject common name; example; s:/CN=[value]/C=[state]/C=[DE,PL] or r:/CN=[^val.{2}$]/C=[state]/C=[DE,PL]; check manual for more details
--proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
--proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096)
--proxy-response-buffer-size int Response buffer size pro tcp connection (default 4096)
--sasl-aws-identity-lookup Verify AWS authentication identity
--sasl-aws-profile string AWS profile
--sasl-aws-region string Region for AWS IAM Auth
--sasl-aws-role-arn string AWS Role ARN to assume
--sasl-enable Connect using SASL
--sasl-jaas-config-file string Location of JAAS config file with SASL username and password
--sasl-method string SASL method to use (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS_MSK_IAM (default "PLAIN")
Expand All @@ -207,7 +212,9 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--tls-client-key-password string Password to decrypt rsa private key
--tls-enable Whether or not to use TLS when connecting to the broker
--tls-insecure-skip-verify It controls whether a client verifies the server's certificate chain and host name
--tls-refresh duration Interval for refreshing client TLS certificates. If set to zero, the refresh watch is disabled
--tls-same-client-cert-enable Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)
--tls-system-cert-pool Use system pool for root CAs

### Usage example

Expand Down
35 changes: 31 additions & 4 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package server

import (
"fmt"
"log/slog"

"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/proxy"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
sloglogrus "github.com/samber/slog-logrus/v2"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

Expand Down Expand Up @@ -101,10 +103,12 @@ func initFlags() {
Server.Flags().DurationVar(&c.Proxy.ListenerKeepAlive, "proxy-listener-keep-alive", 60*time.Second, "Keep alive period for an active network connection. If zero, keep-alives are disabled")

Server.Flags().BoolVar(&c.Proxy.TLS.Enable, "proxy-listener-tls-enable", false, "Whether or not to use TLS listener")
Server.Flags().DurationVar(&c.Proxy.TLS.Refresh, "proxy-listener-tls-refresh", 0*time.Second, "Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCertFile, "proxy-listener-cert-file", "", "PEM encoded file with server certificate")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyFile, "proxy-listener-key-file", "", "PEM encoded file with private key for the server certificate")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyPassword, "proxy-listener-key-password", os.Getenv("PROXY_LISTENER_KEY_PASSWORD"), "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Proxy.TLS.CAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCRLFile, "proxy-listener-crl-file", "", "PEM encoded X509 CRLs file")
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCipherSuites, "proxy-listener-cipher-suites", []string{}, "List of supported cipher suites")
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCurvePreferences, "proxy-listener-curve-preferences", []string{}, "List of curve preferences")

Expand Down Expand Up @@ -151,11 +155,13 @@ func initFlags() {

// TLS
Server.Flags().BoolVar(&c.Kafka.TLS.Enable, "tls-enable", false, "Whether or not to use TLS when connecting to the broker")
Server.Flags().DurationVar(&c.Kafka.TLS.Refresh, "tls-refresh", 0*time.Second, "Interval for refreshing client TLS certificates. If set to zero, the refresh watch is disabled")
Server.Flags().BoolVar(&c.Kafka.TLS.InsecureSkipVerify, "tls-insecure-skip-verify", false, "It controls whether a client verifies the server's certificate chain and host name")
Server.Flags().StringVar(&c.Kafka.TLS.ClientCertFile, "tls-client-cert-file", "", "PEM encoded file with client certificate")
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyFile, "tls-client-key-file", "", "PEM encoded file with private key for the client certificate")
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", os.Getenv("TLS_CLIENT_KEY_PASSWORD"), "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")
Server.Flags().BoolVar(&c.Kafka.TLS.SystemCertPool, "tls-system-cert-pool", false, "Use system pool for root CAs")

//Same TLS client cert tls-same-client-cert-enable
Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "tls-same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)")
Expand All @@ -181,6 +187,8 @@ func initFlags() {
// SASL AWS_MSK_IAM
Server.Flags().StringVar(&c.Kafka.SASL.AWSConfig.Region, "sasl-aws-region", "", "Region for AWS IAM Auth")
Server.Flags().StringVar(&c.Kafka.SASL.AWSConfig.Profile, "sasl-aws-profile", "", "AWS profile")
Server.Flags().StringVar(&c.Kafka.SASL.AWSConfig.RoleArn, "sasl-aws-role-arn", "", "AWS Role ARN to assume")
Server.Flags().BoolVar(&c.Kafka.SASL.AWSConfig.IdentityLookup, "sasl-aws-identity-lookup", false, "Verify AWS authentication identity")

// SASL by Proxy plugin
Server.Flags().BoolVar(&c.Kafka.SASL.Plugin.Enable, "sasl-plugin-enable", false, "Use plugin for SASL authentication")
Expand All @@ -202,7 +210,7 @@ func initFlags() {

// Logging
Server.Flags().StringVar(&c.Log.Format, "log-format", "text", "Log format text or json")
Server.Flags().StringVar(&c.Log.Level, "log-level", "info", "Log level debug, info, warning, error, fatal or panic")
Server.Flags().StringVar(&c.Log.Level, "log-level", "info", "Log level trace, debug, info, warning, error, fatal or panic")
Server.Flags().StringVar(&c.Log.LevelFieldName, "log-level-fieldname", "@level", "Log level fieldname for json format")
Server.Flags().StringVar(&c.Log.TimeFiledName, "log-time-fieldname", "@timestamp", "Time fieldname for json format")
Server.Flags().StringVar(&c.Log.MsgFiledName, "log-msg-fieldname", "@message", "Message fieldname for json format")
Expand Down Expand Up @@ -444,7 +452,7 @@ func Run(_ *cobra.Command, _ []string) {
func NewHTTPHandler() http.Handler {
m := http.NewServeMux()
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(
_, _ = w.Write([]byte(
`<html>
<head>
<title>kafka-proxy service</title>
Expand All @@ -456,7 +464,7 @@ func NewHTTPHandler() http.Handler {
</html>`))
})
m.HandleFunc(c.Http.HealthPath, func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`OK`))
_, _ = w.Write([]byte(`OK`))
})
m.Handle(c.Http.MetricsPath, promhttp.Handler())

Expand All @@ -483,6 +491,25 @@ func SetLogger() {
level = logrus.InfoLevel
}
logrus.SetLevel(level)

slog.SetDefault(slog.New(sloglogrus.Option{Level: toSlogLevel(level), Logger: logrus.StandardLogger()}.NewLogrusHandler()))
}

func toSlogLevel(level logrus.Level) slog.Level {
switch level {
case logrus.TraceLevel:
return slog.LevelDebug
case logrus.DebugLevel:
return slog.LevelDebug
case logrus.InfoLevel:
return slog.LevelInfo
case logrus.WarnLevel:
return slog.LevelWarn
case logrus.ErrorLevel, logrus.PanicLevel:
return slog.LevelError
default:
return slog.LevelInfo
}
}

func NewPluginClient(handshakeConfig plugin.HandshakeConfig, plugins map[string]plugin.Plugin, logLevel string, command string, params []string) *plugin.Client {
Expand Down
5 changes: 4 additions & 1 deletion cmd/plugin-auth-ldap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ func main() {

pluginMeta := &pluginMeta{}
flags := pluginMeta.flagSet()
flags.Parse(os.Args[1:])
if err := flags.Parse(os.Args[1:]); err != nil {
logrus.Errorf("error parsing flags: %v", err)
os.Exit(1)
}

urls, err := pluginMeta.getUrls()
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion cmd/plugin-auth-user/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func (f *PasswordAuthenticator) flagSet() *flag.FlagSet {
func main() {
passwordAuthenticator := &PasswordAuthenticator{}
flags := passwordAuthenticator.flagSet()
flags.Parse(os.Args[1:])
if err := flags.Parse(os.Args[1:]); err != nil {
logrus.Errorf("error parsing flags: %v", err)
os.Exit(1)
}

if passwordAuthenticator.Password == "" {
passwordAuthenticator.Password = os.Getenv(EnvSaslPassword)
Expand Down
Loading