diff --git a/.github/workflows/docker-image.yaml b/.github/workflows/docker-image.yaml index af60c133..a6623d74 100644 --- a/.github/workflows/docker-image.yaml +++ b/.github/workflows/docker-image.yaml @@ -1,62 +1,56 @@ -name: Build Docker image +name: docker-image-push + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. on: push: - tags: - - '*' - branches: - - "master" - paths-ignore: - - 'charts/**' + tags: [ '*' ] + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: ghcr.io + # github.repository as / + IMAGE_NAME: adobe/kminion jobs: build: runs-on: ubuntu-latest - + permissions: + contents: read + packages: write steps: - - uses: actions/checkout@v3 - - - name: Set up QEMU - uses: docker/setup-qemu-action@v2 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + - name: Checkout repository + uses: actions/checkout@v3 + # Login against a Docker registry except on PR + # https://github.com/docker/login-action + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@v2 with: - driver-opts: image=moby/buildkit:v0.10.3,network=host - + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} - name: Set Release Date run: | echo "BUILT_AT=$(date --rfc-3339=date)" >> ${GITHUB_ENV} - - - name: Docker meta - id: docker_meta + # Extract metadata (tags, labels) for Docker + # https://github.com/docker/metadata-action + - name: Extract Docker metadata + id: meta uses: docker/metadata-action@v4 with: - # list of Docker images to use as base name for tags - images: | - vectorized/kminion - redpandadata/kminion - # generate Docker tags based on the following events/attributes - # Semver type is only active on 'push tag' events, hence no enable condition required - tags: | - type=sha,prefix={{branch}}-,format=short,enable={{is_default_branch}} - type=semver,pattern={{raw}} - - - name: Login to DockerHub - uses: docker/login-action@v2 - with: - username: ${{ secrets.DOCKERHUB_USERNAME }} - password: ${{ secrets.DOCKERHUB_PASSWORD }} - - - name: Build and push + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + - name: Build and push Docker image uses: docker/build-push-action@v3 with: - push: true - platforms: linux/amd64,linux/arm64 - tags: ${{ steps.docker_meta.outputs.tags }} + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} build-args: | - VERSION=${{ fromJSON(steps.docker_meta.outputs.json).labels['org.opencontainers.image.version'] }} + VERSION=${{ fromJSON(steps.meta.outputs.json).labels['org.opencontainers.image.version'] }} BUILT_AT=${{ env.BUILT_AT }} COMMIT=${{ github.sha }} - cache-from: type=gha - cache-to: type=gha,mode=max diff --git a/.github/workflows/go-releaser.yaml b/.github/workflows/go-releaser.yaml deleted file mode 100644 index 795f75a6..00000000 --- a/.github/workflows/go-releaser.yaml +++ /dev/null @@ -1,31 +0,0 @@ -name: GoReleaser - -on: - push: - tags: - - '*' - -jobs: - goreleaser: - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - fetch-depth: 0 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version-file: 'go.mod' - - - name: Run GoReleaser - uses: goreleaser/goreleaser-action@v2 - if: startsWith(github.ref, 'refs/tags/') - with: - version: latest - args: release --rm-dist - workdir: . - env: - CGO_ENABLED: 0 - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/kics-iac.yml b/.github/workflows/kics-iac.yml deleted file mode 100644 index 994227cc..00000000 --- a/.github/workflows/kics-iac.yml +++ /dev/null @@ -1,29 +0,0 @@ -name: kics scanning -on: - push: - branches: master -jobs: - kics: - runs-on: ubuntu-latest - env: - AWS_ACCESS_KEY_ID: ${{ secrets.VULN_REPORTS_AWS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.VULN_REPORTS_AWS_SECRET_ACCESS_KEY }} - VULN_REPORTS_AWS_BUCKET: ${{ secrets.VULN_REPORTS_AWS_BUCKET }} - AWS_EC2_METADATA_DISABLED: true - steps: - - uses: actions/checkout@v2 - - name: run kics Scan - uses: checkmarx/kics-github-action@v1.6.3 - with: - path: . - ignore_on_exit: results - output_path: res/ - - name: display kics results - run: | - cat res/results.json - - name: upload scan results - run: | - set -eu - KEY="`date +%Y`/`date +%m`/`date +%d`/${GITHUB_REPOSITORY#*/}_${GITHUB_REF#refs/heads/}_kics_`date +%s`.json" - echo "[i] writing to s3 object '$KEY'" - aws s3 cp res/results.json s3://$VULN_REPORTS_AWS_BUCKET/$KEY diff --git a/.github/workflows/snyk-scan.yml b/.github/workflows/snyk-scan.yml index 3130e8f4..508e5aa0 100644 --- a/.github/workflows/snyk-scan.yml +++ b/.github/workflows/snyk-scan.yml @@ -22,5 +22,5 @@ jobs: env: SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} run: | - snyk monitor --project-name=kminion --remote-repo=redpanda-data/kminion --target-reference=${GITHUB_REF#refs/heads/} . - shell: bash \ No newline at end of file + snyk monitor --project-name=kminion --remote-repo=adobe/kminion --target-reference=${GITHUB_REF#refs/heads/} . + shell: bash diff --git a/.goreleaser.yml b/.goreleaser.yml deleted file mode 100644 index 40bf771a..00000000 --- a/.goreleaser.yml +++ /dev/null @@ -1,38 +0,0 @@ -release: - name_template: '{{.Version}} / {{time "2006-01-02"}}' - prerelease: auto - mode: append - footer: | - ## Docker Image - Use the following command to pull this release's Docker image: - ```sh - docker pull redpandadata/kminion:{{ .Tag }} - ``` -changelog: - skip: false - use: github - filters: - # Commit messages matching the regexp listed here will be removed from the changelog - exclude: - - '^docs:' - - '^test:' - - '^npm:' - - '^go.mod:' - - '^.github:' - - 'Merge branch' - -builds: - - id: kminion - binary: kminion - goos: - - darwin - - linux - - windows - goarch: - - amd64 - - arm64 - ldflags: - - -s -w -X main.version={{.Version}} -X main.builtAt={{.Date}} -X main.commit={{.Commit}} - -checksum: - name_template: 'checksums.txt' diff --git a/Dockerfile b/Dockerfile index 5679e494..a93d20eb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,12 +27,9 @@ RUN CGO_ENABLED=0 go build \ ############################################################ # Runtime Image ############################################################ -FROM alpine:3.17 +FROM alpine:3 COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /app/bin/kminion /app/kminion -RUN addgroup -S redpanda \ - && adduser -S redpanda -G redpanda \ - && chmod o+rx /app/kminion -USER redpanda +RUN chmod -R +x /app/kminion ENTRYPOINT ["/app/kminion"] diff --git a/charts/kminion/templates/_helpers.tpl b/charts/kminion/templates/_helpers.tpl index 702059d5..0690934a 100644 --- a/charts/kminion/templates/_helpers.tpl +++ b/charts/kminion/templates/_helpers.tpl @@ -39,7 +39,6 @@ helm.sh/chart: {{ include "kminion.chart" . }} {{- if .Chart.AppVersion }} app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} {{- end }} -app.kubernetes.io/managed-by: {{ .Release.Service }} {{- if .Values.customLabels}} {{ toYaml .Values.customLabels }} {{- end}} diff --git a/charts/kminion/templates/configmap.yaml b/charts/kminion/templates/configmap.yaml index dbece53b..6341a2a4 100644 --- a/charts/kminion/templates/configmap.yaml +++ b/charts/kminion/templates/configmap.yaml @@ -3,6 +3,9 @@ kind: ConfigMap metadata: name: {{include "kminion.fullname" .}} namespace: {{ .Release.Namespace | quote }} + annotations: + strategy.spinnaker.io/versioned: "false" + strategy.spinnaker.io/replace: "true" labels: {{- include "kminion.labels" . | nindent 4}} data: diff --git a/charts/kminion/values.yaml b/charts/kminion/values.yaml index 21240e00..b15cd1d9 100644 --- a/charts/kminion/values.yaml +++ b/charts/kminion/values.yaml @@ -5,7 +5,7 @@ replicaCount: 1 image: - repository: redpandadata/kminion + repository: ghcr.io/adobe/kminion pullPolicy: IfNotPresent # Overrides the image tag whose default is the chart appVersion. tag: "" @@ -28,14 +28,14 @@ podAnnotations: {} # prometheus.io/port: "8080" # prometheus.io/path: "/metrics" -podSecurityContext: - runAsUser: 99 - fsGroup: 99 +podSecurityContext: {} + # runAsUser: 99 + # fsGroup: 99 ## See `kubectl explain poddisruptionbudget.spec` for more ## ref: https://kubernetes.io/docs/tasks/run-application/configure-pdb/ -podDisruptionBudget: - maxUnavailable: 1 +podDisruptionBudget: {} + # maxUnavailable: 1 # minAvailable: 1 securityContext: {} diff --git a/docker-compose.yml b/docker-compose.yml index aae9b322..6c149b73 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,5 @@ --- -version: '2.1' +version: '3.9' services: @@ -12,6 +12,8 @@ services: ZOOKEEPER_TICK_TIME: 2000 container_name: zookeeper hostname: zookeeper + networks: + - kminion kafka: image: confluentinc/cp-kafka:latest @@ -30,6 +32,10 @@ services: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + links: + - zookeeper + networks: + - kminion kafka-minion: build: @@ -44,4 +50,38 @@ services: - 8080:8080 environment: KAFKA_BROKERS: kafka:29092 - restart: unless-stopped \ No newline at end of file + restart: unless-stopped + links: + - kafka + networks: + - kminion + + grafana: + image: grafana/grafana-oss + ports: + - '3000:3000' + volumes: + - "/tmp/grafana:/var/lib/grafana" + container_name: grafana + hostname: grafana + networks: + - kminion + + prometheus: + image: prom/prometheus + ports: + - '9090:9090' + configs: + - source: prometheus + target: /etc/prometheus/prometheus.yml + container_name: prometheus + hostname: prometheus + networks: + - kminion +configs: + prometheus: + file: example/sample_prometheus.yml + + +networks: + kminion: diff --git a/e2e/config.go b/e2e/config.go index 9c544193..e644e17b 100644 --- a/e2e/config.go +++ b/e2e/config.go @@ -6,16 +6,18 @@ import ( ) type Config struct { - Enabled bool `koanf:"enabled"` - TopicManagement EndToEndTopicConfig `koanf:"topicManagement"` - ProbeInterval time.Duration `koanf:"probeInterval"` - Producer EndToEndProducerConfig `koanf:"producer"` - Consumer EndToEndConsumerConfig `koanf:"consumer"` + Enabled bool `koanf:"enabled"` + TopicManagement EndToEndTopicConfig `koanf:"topicManagement"` + ProbeInterval time.Duration `koanf:"probeInterval"` + ReconnectInterval time.Duration `koanf:"reconnectInterval"` + Producer EndToEndProducerConfig `koanf:"producer"` + Consumer EndToEndConsumerConfig `koanf:"consumer"` } func (c *Config) SetDefaults() { c.Enabled = false c.ProbeInterval = 100 * time.Millisecond + c.ReconnectInterval = 0 * time.Second c.TopicManagement.SetDefaults() c.Producer.SetDefaults() c.Consumer.SetDefaults() diff --git a/e2e/consumer.go b/e2e/consumer.go index 865ca8ad..75222f66 100644 --- a/e2e/consumer.go +++ b/e2e/consumer.go @@ -13,23 +13,32 @@ import ( func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<- bool) { client := s.client + logger := s.logger.Named("consumer") - s.logger.Info("Starting to consume end-to-end topic", + logger.Info("Starting to consume end-to-end topic", zap.String("topic_name", s.config.TopicManagement.Name), zap.String("group_id", s.groupId)) isInitialized := false for { + if ctx.Err() != nil { + break + } fetches := client.PollFetches(ctx) if !isInitialized { isInitialized = true initializedCh <- true } + if fetches == nil { + break + } + + logger.Debug("fetching messages", zap.Any("fetches", fetches)) // Log all errors and continue afterwards as we might get errors and still have some fetch results errors := fetches.Errors() for _, err := range errors { - s.logger.Error("kafka fetch error", + logger.Error("kafka fetch error", zap.String("topic", err.Topic), zap.Int32("partition", err.Partition), zap.Error(err.Err)) @@ -37,6 +46,9 @@ func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<- fetches.EachRecord(s.processMessage) } + + client.LeaveGroup() + logger.Info("Consumer thread exited") } func (s *Service) commitOffsets(ctx context.Context) { @@ -75,6 +87,8 @@ func (s *Service) commitOffsets(ctx context.Context) { // - checks if it is from us, or from another kminion process running somewhere else // - hands it off to the service, which then reports metrics on it func (s *Service) processMessage(record *kgo.Record) { + logger := s.logger.Named("consumer") + if record.Value == nil { // Init messages have nil values - we want to skip these. They are only used to make sure a consumer is ready. return @@ -82,7 +96,7 @@ func (s *Service) processMessage(record *kgo.Record) { var msg EndToEndMessage if jerr := json.Unmarshal(record.Value, &msg); jerr != nil { - s.logger.Error("failed to unmarshal message value", zap.Error(jerr)) + logger.Error("failed to unmarshal message value", zap.Error(jerr)) return // maybe older version } diff --git a/e2e/producer.go b/e2e/producer.go index 6e13f330..9a097754 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -22,6 +22,8 @@ func (s *Service) produceMessagesToAllPartitions(ctx context.Context) { // it will add it to the message tracker. If producing fails a message will be logged and the respective metrics // will be incremented. func (s *Service) produceMessage(ctx context.Context, partition int) { + logger := s.logger.Named("producer") + topicName := s.config.TopicManagement.Name record, msg := createEndToEndRecord(s.minionID, topicName, partition) @@ -34,6 +36,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) { pID := strconv.Itoa(partition) s.messagesProducedInFlight.WithLabelValues(pID).Inc() s.messageTracker.addToTracker(msg) + logger.Debug("producing message", zap.Any("record", record)) s.client.Produce(childCtx, record, func(r *kgo.Record, err error) { defer cancel() ackDuration := time.Since(startTime) @@ -61,7 +64,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) { msg.state = EndToEndMessageStateProducedSuccessfully msg.produceLatency = ackDuration.Seconds() - // TODO: Enable again as soon as https://github.com/ReneKroon/ttlcache/issues/60 is fixed + // TODO: Enable again as soon as https://github.com/jellydatora/ttlcache/issues/60 is fixed // Because we cannot update cache items in an atomic fashion we currently can't use this method // as this would cause a race condition which ends up in records being reported as lost/expired. // s.messageTracker.updateItemIfExists(msg) diff --git a/e2e/service.go b/e2e/service.go index 6f530705..79082f61 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -18,8 +18,10 @@ type Service struct { config Config logger *zap.Logger - kafkaSvc *kafka.Service // creates kafka client for us - client *kgo.Client + kafkaSvc *kafka.Service // creates kafka client for us + client *kgo.Client + adminClient *kgo.Client + kgoOpts []kgo.Opt // Service minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time @@ -57,6 +59,7 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite()) } kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(3*time.Second)) + kgoOpts = append(kgoOpts, kgo.ClientID("kminion")) // Consumer configs kgoOpts = append(kgoOpts, @@ -64,7 +67,9 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k kgo.ConsumeTopics(cfg.TopicManagement.Name), kgo.Balancers(kgo.CooperativeStickyBalancer()), kgo.DisableAutoCommit(), - kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd())) + kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), + kgo.InstanceID(groupID), + ) // Prepare hooks hooks := newEndToEndClientHooks(logger) @@ -73,29 +78,17 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k // We use the manual partitioner so that the records' partition id will be used as target partition kgoOpts = append(kgoOpts, kgo.RecordPartitioner(kgo.ManualPartitioner())) - // Create kafka service and check if client can successfully connect to Kafka cluster - logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata", - zap.String("seed_brokers", strings.Join(kafkaSvc.Brokers(), ","))) - client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts) - if err != nil { - return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err) - } - logger.Info("successfully connected to kafka cluster") - svc := &Service{ config: cfg, logger: logger.Named("e2e"), kafkaSvc: kafkaSvc, - client: client, + kgoOpts: kgoOpts, minionID: minionID, groupId: groupID, clientHooks: hooks, } - svc.groupTracker = newGroupTracker(cfg, logger, client, groupID) - svc.messageTracker = newMessageTracker(svc) - makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec { cv := prometheus.NewCounterVec(prometheus.CounterOpts{ Subsystem: "end_to_end", @@ -145,23 +138,85 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k return svc, nil } -// Start starts the service (wow) -func (s *Service) Start(ctx context.Context) error { +func (s *Service) initKafka(ctx context.Context) error { + // Create kafka service and check if client can successfully connect to Kafka cluster + s.logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata", + zap.String("seed_brokers", strings.Join(s.kafkaSvc.Brokers(), ","))) + client, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, s.kgoOpts) + if err != nil { + return fmt.Errorf("failed to create kafka client for e2e: %w", err) + } + s.logger.Info("successfully connected to kafka cluster") + + s.client = client + s.groupTracker = newGroupTracker(s.config, s.logger, client, s.groupId) + s.messageTracker = newMessageTracker(s) + + return nil +} + +func (s *Service) initReconcile(ctx context.Context) error { + s.logger.Info("Starting reconcile") + // Create the KafkaAdmin client used for topic partition and leader reconciliation + adminClient, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, []kgo.Opt{}) + if err != nil { + return fmt.Errorf("failed to create kafka client for e2e: %w", err) + } + + s.adminClient = adminClient + // Ensure topic exists and is configured correctly if err := s.validateManagementTopic(ctx); err != nil { return fmt.Errorf("could not validate end-to-end topic: %w", err) } - // Get up-to-date metadata and inform our custom partitioner about the partition count - topicMetadata, err := s.getTopicMetadata(ctx) - if err != nil { - return fmt.Errorf("could not get topic metadata after validation: %w", err) + // start topic creation/partition/leader reconciliation loop + go s.startReconciliation(ctx) + + return nil +} + +// Start starts the service (wow) +func (s *Service) Start(ctx context.Context) error { + if err := s.initReconcile(ctx); err != nil { + return err } - partitions := len(topicMetadata.Topics[0].Partitions) - s.partitionCount = partitions + if s.config.ReconnectInterval > 0*time.Second { + go s.reconnectLoop(ctx) + } else { + if err := s.run(ctx); err != nil { + return err + } - // finally start everything else (producing, consuming, continuous validation, consumer group tracking) - go s.startReconciliation(ctx) + } + return nil +} + +// Stop stops the service +func (s *Service) Stop() { + s.logger.Info("Stopping e2e service") + s.client.Close() +} + +func (s *Service) reconnectLoop(pctx context.Context) { + for { + ctx, _ := context.WithTimeout(pctx, s.config.ReconnectInterval) + s.run(ctx) + select { + case <-ctx.Done(): + s.Stop() + fmt.Println("Restarting e2e service") + case <-pctx.Done(): + s.Stop() + return + } + } +} + +func (s *Service) run(ctx context.Context) error { + if err := s.initKafka(ctx); err != nil { + return err + } // Start consumer and wait until we've received a response for the first poll which would indicate that the // consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not diff --git a/e2e/topic.go b/e2e/topic.go index d54291a4..0e06f31e 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -13,14 +13,18 @@ import ( // Check our end-to-end test topic and adapt accordingly if something does not match our expectations. // - does it exist? +// - does configuration allow topic management? +// // - is it configured correctly? -// - does it have enough partitions? -// - is the replicationFactor correct? +// - does it have enough partitions? +// - is the replicationFactor correct? +// // - are assignments good? -// - is each broker leading at least one partition? -// - are replicas distributed correctly? +// - is each broker leading at least one partition? +// - are replicas distributed correctly? func (s *Service) validateManagementTopic(ctx context.Context) error { - s.logger.Debug("validating end-to-end topic...") + logger := s.logger.Named("ManagementTopic") + logger.Info("validating end-to-end topic...") meta, err := s.getTopicMetadata(ctx) if err != nil { @@ -35,6 +39,10 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { case kerr.UnknownTopicOrPartition: // UnknownTopicOrPartition (Error code 3) means that the topic does not exist. // When the topic doesn't exist, continue to create it further down in the code. + if !s.config.TopicManagement.Enabled { + return fmt.Errorf("the configured end to end topic does not exist. The topic will not be created " + + "because topic management is disabled") + } topicExists = false default: // If the topic (possibly) exists, but there's an error, then this should result in a fail @@ -43,17 +51,16 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { // Create topic if it doesn't exist if !topicExists { - if !s.config.TopicManagement.Enabled { - return fmt.Errorf("the configured end to end topic does not exist. The topic will not be created " + - "because topic management is disabled") - } - if err = s.createManagementTopic(ctx, meta); err != nil { return err } + meta, err = s.getTopicMetadata(ctx) + if err != nil { + return fmt.Errorf("could not get topic metadata after validation: %w", err) + } } - alterReq, createReq, err := s.calculatePartitionReassignments(meta) + alterReq, createReq, pleReq, err := s.calculatePartitionReassignments(meta) if err != nil { return fmt.Errorf("failed to calculate partition reassignments: %w", err) } @@ -68,6 +75,35 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { return fmt.Errorf("failed to create partitions: %w", err) } + err = s.executeLeaderElection(ctx, pleReq) + if err != nil { + return fmt.Errorf("failed to elect partitions: %w", err) + } + + logger.Info("end-to-end topic is valid.") + + return nil +} + +func (s *Service) executeLeaderElection(ctx context.Context, req *kmsg.ElectLeadersRequest) error { + if req == nil { + return nil + } + + res, err := req.RequestWith(ctx, s.adminClient) + if err != nil { + return err + } + + for _, topic := range res.Topics { + for _, partition := range topic.Partitions { + typedErr := kerr.TypedErrorForCode(partition.ErrorCode) + if typedErr != nil { + return fmt.Errorf("inner Kafka error: %w", typedErr) + } + } + } + return nil } @@ -76,7 +112,7 @@ func (s *Service) executeCreatePartitions(ctx context.Context, req *kmsg.CreateP return nil } - res, err := req.RequestWith(ctx, s.client) + res, err := req.RequestWith(ctx, s.adminClient) if err != nil { return err } @@ -84,7 +120,7 @@ func (s *Service) executeCreatePartitions(ctx context.Context, req *kmsg.CreateP for _, topic := range res.Topics { typedErr := kerr.TypedErrorForCode(topic.ErrorCode) if typedErr != nil { - return fmt.Errorf("inner Kafka error: %w", err) + return fmt.Errorf("inner Kafka error: %w", typedErr) } } @@ -96,20 +132,20 @@ func (s *Service) executeAlterPartitionAssignments(ctx context.Context, req *kms return nil } - res, err := req.RequestWith(ctx, s.client) + res, err := req.RequestWith(ctx, s.adminClient) if err != nil { return err } typedErr := kerr.TypedErrorForCode(res.ErrorCode) if typedErr != nil { - return fmt.Errorf("inner Kafka error: %w", err) + return fmt.Errorf("inner Kafka error: %w", typedErr) } for _, topic := range res.Topics { for _, partition := range topic.Partitions { typedErr = kerr.TypedErrorForCode(partition.ErrorCode) if typedErr != nil { - return fmt.Errorf("inner Kafka partition error on partition '%v': %w", partition.Partition, err) + return fmt.Errorf("inner Kafka partition error on partition '%v': %w", partition.Partition, typedErr) } } } @@ -117,19 +153,28 @@ func (s *Service) executeAlterPartitionAssignments(ctx context.Context, req *kms return nil } -func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) (*kmsg.AlterPartitionAssignmentsRequest, *kmsg.CreatePartitionsRequest, error) { +func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) (*kmsg.AlterPartitionAssignmentsRequest, *kmsg.CreatePartitionsRequest, *kmsg.ElectLeadersRequest, error) { brokerByID := brokerMetadataByBrokerID(meta.Brokers) topicMeta := meta.Topics[0] desiredReplicationFactor := s.config.TopicManagement.ReplicationFactor + desiredPartitionsPerBroker := s.config.TopicManagement.PartitionsPerBroker if desiredReplicationFactor > len(brokerByID) { - return nil, nil, fmt.Errorf("the desired replication factor of '%v' is larger than the available brokers "+ + return nil, nil, nil, fmt.Errorf("the desired replication factor of '%v' is larger than the available brokers "+ "('%v' brokers)", desiredReplicationFactor, len(brokerByID)) } + partitionLeaderElections := []int32{} + for _, partition := range topicMeta.Partitions { + if partition.Replicas[0] != partition.Leader { + partitionLeaderElections = append(partitionLeaderElections, partition.Partition) + } + } + // We want to ensure that each brokerID leads at least one partition permanently. Hence let's iterate over brokers. preferredLeaderPartitionsBrokerID := make(map[int32][]kmsg.MetadataResponseTopicPartition) for _, broker := range brokerByID { + preferredLeaderPartitionsBrokerID[broker.NodeID] = []kmsg.MetadataResponseTopicPartition{} for _, partition := range topicMeta.Partitions { // PreferredLeader = BrokerID of the brokerID that is the desired leader. Regardless who the current leader is preferredLeader := partition.Replicas[0] @@ -139,11 +184,11 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( } } - // Partitions that use the same brokerID more than once as preferred leader can be reassigned to other brokers + // Partitions that use the same brokerID more than desiredPartitionsPerBroker as preferred leader can be reassigned to other brokers // We collect them to avoid creating new partitions when not needed. reassignablePartitions := make([]kmsg.MetadataResponseTopicPartition, 0) for _, partitions := range preferredLeaderPartitionsBrokerID { - if len(partitions) > 1 { + if len(partitions) > desiredPartitionsPerBroker { reassignablePartitions = append(reassignablePartitions, partitions[1:]...) continue } @@ -155,7 +200,6 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( partitionCount := len(topicMeta.Partitions) partitionReassignments := make([]kmsg.AlterPartitionAssignmentsRequestTopicPartition, 0) createPartitionAssignments := make([]kmsg.CreatePartitionsRequestTopicAssignment, 0) - for brokerID, partitions := range preferredLeaderPartitionsBrokerID { // Add replicas if number of replicas is smaller than desiredReplicationFactor for _, partition := range partitions { @@ -168,7 +212,7 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( } // TODO: Consider more than one partition per broker config - if len(partitions) != 0 { + if len(partitions) >= desiredPartitionsPerBroker { continue } @@ -183,11 +227,13 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( reassignablePartitions = reassignablePartitions[1:] } - // Create a new partition for this broker - partitionCount++ - assignmentReq := kmsg.NewCreatePartitionsRequestTopicAssignment() - assignmentReq.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID]) - createPartitionAssignments = append(createPartitionAssignments, assignmentReq) + // Create new partitions for this broker + for i := 0; i < desiredPartitionsPerBroker-len(partitions); i++ { + partitionCount++ + assignmentReq := kmsg.NewCreatePartitionsRequestTopicAssignment() + assignmentReq.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID]) + createPartitionAssignments = append(createPartitionAssignments, assignmentReq) + } } var reassignmentReq *kmsg.AlterPartitionAssignmentsRequest @@ -195,7 +241,7 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( s.logger.Info("e2e probe topic has to be modified due to missing replicas or wrong preferred leader assignments", zap.Int("partition_count", len(topicMeta.Partitions)), zap.Int("broker_count", len(meta.Brokers)), - zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker), + zap.Int("config_partitions_per_broker", desiredPartitionsPerBroker), zap.Int("config_replication_factor", s.config.TopicManagement.ReplicationFactor), zap.Int("partitions_to_reassign", len(partitionReassignments)), ) @@ -215,6 +261,7 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( zap.Int("broker_count", len(meta.Brokers)), zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker), zap.Int("partitions_to_add", len(createPartitionAssignments)), + zap.Any("partitions", createPartitionAssignments), ) r := kmsg.NewCreatePartitionsRequest() createPartitionsTopicReq := kmsg.NewCreatePartitionsRequestTopic() @@ -225,7 +272,24 @@ func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) ( createReq = &r } - return reassignmentReq, createReq, nil + var pleReq *kmsg.ElectLeadersRequest + if len(partitionLeaderElections) > 0 { + s.logger.Info("e2e probe topic leaders are not preferred", + zap.Int("partitions_to_elect", len(partitionLeaderElections)), + ) + r := kmsg.NewElectLeadersRequest() + electLeadersRequestTopic := kmsg.NewElectLeadersRequestTopic() + electLeadersRequestTopic.Topic = s.config.TopicManagement.Name + electLeadersRequestTopic.Partitions = partitionLeaderElections + r.Topics = []kmsg.ElectLeadersRequestTopic{electLeadersRequestTopic} + r.ElectionType = 0 // preferred + pleReq = &r + } + + // update partition count for e2e test + s.partitionCount = partitionCount + + return reassignmentReq, createReq, pleReq, nil } // calculateAppropriateReplicas returns the best possible brokerIDs that shall be used as replicas. @@ -299,7 +363,7 @@ func (s *Service) createManagementTopic(ctx context.Context, allMeta *kmsg.Metad req := kmsg.NewCreateTopicsRequest() req.Topics = []kmsg.CreateTopicsRequestTopic{topic} - res, err := req.RequestWith(ctx, s.client) + res, err := req.RequestWith(ctx, s.adminClient) if err != nil { return fmt.Errorf("failed to create e2e topic: %w", err) } @@ -320,7 +384,7 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse, req := kmsg.NewMetadataRequest() req.Topics = []kmsg.MetadataRequestTopic{topicReq} - return req.RequestWith(ctx, s.client) + return req.RequestWith(ctx, s.adminClient) } func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (*kmsg.DescribeConfigsResponse, error) { @@ -335,7 +399,7 @@ func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (* }, } - return req.RequestWith(ctx, s.client) + return req.RequestWith(ctx, s.adminClient) } func createTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestTopicConfig { diff --git a/go.mod b/go.mod index cd0d5298..b767e07d 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/twmb/franz-go/pkg/kmsg v1.5.0 github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 go.uber.org/atomic v1.11.0 + go.uber.org/automaxprocs v1.5.2 go.uber.org/zap v1.24.0 golang.org/x/sync v0.2.0 ) diff --git a/go.sum b/go.sum index a1e19709..321f579d 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -306,6 +307,8 @@ go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME= +go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= diff --git a/main.go b/main.go index 2020f439..520a532b 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "github.com/cloudhut/kminion/v2/prometheus" promclient "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/automaxprocs/maxprocs" "go.uber.org/zap" ) @@ -43,14 +44,16 @@ func main() { if err != nil { startupLogger.Fatal("failed to parse config", zap.Error(err)) } - logger := logging.NewLogger(cfg.Logger, cfg.Exporter.Namespace).Named("main") - if err != nil { - startupLogger.Fatal("failed to create new logger", zap.Error(err)) - } - logger.Info("started kminion", zap.String("version", version), zap.String("built_at", builtAt)) + // set GOMAXPROCS automatically + l := func(format string, a ...interface{}) { + logger.Info(fmt.Sprintf(format, a...)) + } + if _, err = maxprocs.Set(maxprocs.Logger(l)); err != nil { + logger.Fatal("failed to set GOMAXPROCS automatically", zap.Error(err)) + } // Setup context that stops when the application receives an interrupt signal ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() diff --git a/minion/config_consumer_group.go b/minion/config_consumer_group.go index 01880c6e..478d5af1 100644 --- a/minion/config_consumer_group.go +++ b/minion/config_consumer_group.go @@ -31,6 +31,11 @@ type ConsumerGroupConfig struct { // IgnoredGroups are regex strings of group ids that shall be ignored/skipped when exporting metrics. Ignored groups // take precedence over allowed groups. IgnoredGroupIDs []string `koanf:"ignoredGroups"` + + // Monitor consumer group states. Empty list means all consumer groups are monitoring regardless of its state + // Allowed values are: Dead, Empty, Stable, PreparingRebalance, CompletingRebalance + // Source: https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala + AllowedConsumerGroupStates []string `koanf:"allowedConsumerGroupStates"` } func (c *ConsumerGroupConfig) SetDefaults() { @@ -76,3 +81,13 @@ func (c *ConsumerGroupConfig) Validate() error { return nil } + +// Returns a map of allowed group states for faster lookup +func (c *ConsumerGroupConfig) GetAllowedConsumerGroupStates() map[string]string { + // create a map for faster lookup + groupStatesMap := make(map[string]string, len(c.AllowedConsumerGroupStates)) + for _, state := range c.AllowedConsumerGroupStates { + groupStatesMap[state] = state + } + return groupStatesMap +} diff --git a/minion/consumer_group_offsets.go b/minion/consumer_group_offsets.go index a7c23503..5ce99325 100644 --- a/minion/consumer_group_offsets.go +++ b/minion/consumer_group_offsets.go @@ -13,7 +13,7 @@ import ( // ListAllConsumerGroupOffsetsInternal returns a map from the in memory storage. The map value is the offset commit // value and is grouped by group id, topic, partition id as keys of the nested maps. func (s *Service) ListAllConsumerGroupOffsetsInternal() map[string]map[string]map[int32]OffsetCommit { - return s.storage.getGroupOffsets() + return s.storage.getGroupOffsets(s.IsGroupAllowed) } // ListAllConsumerGroupOffsetsAdminAPI return all consumer group offsets using Kafka's Admin API. @@ -22,8 +22,9 @@ func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[ if err != nil { return nil, fmt.Errorf("failed to list groupsRes: %w", err) } - groupIDs := make([]string, len(groupsRes.Groups)) - for i, group := range groupsRes.Groups { + groupIDs := make([]string, len(groupsRes.AllowedGroups.Groups)) + + for i, group := range groupsRes.AllowedGroups.Groups { groupIDs[i] = group.Group } diff --git a/minion/describe_consumer_groups.go b/minion/describe_consumer_groups.go index 1e48ae98..ee6e2b91 100644 --- a/minion/describe_consumer_groups.go +++ b/minion/describe_consumer_groups.go @@ -16,27 +16,43 @@ type DescribeConsumerGroupsResponse struct { Groups *kmsg.DescribeGroupsResponse } -func (s *Service) listConsumerGroupsCached(ctx context.Context) (*kmsg.ListGroupsResponse, error) { - reqId := ctx.Value("requestId").(string) - key := "list-consumer-groups-" + reqId +type GroupsInfo struct { + AllowedGroups *kmsg.ListGroupsResponse + AllGroupsCount int +} + +func (s *Service) listConsumerGroupsCached(ctx context.Context) (*GroupsInfo, error) { + keyAllowedGroups := "list-consumer-groups" - if cachedRes, exists := s.getCachedItem(key); exists { - return cachedRes.(*kmsg.ListGroupsResponse), nil + if cachedRes, exists := s.getCachedItem(keyAllowedGroups); exists { + return cachedRes.(*GroupsInfo), nil } - res, err, _ := s.requestGroup.Do(key, func() (interface{}, error) { + groups, err, _ := s.requestGroup.Do(keyAllowedGroups, func() (interface{}, error) { res, err := s.listConsumerGroups(ctx) if err != nil { return nil, err } - s.setCachedItem(key, res, 120*time.Second) + allowedGroups := make([]kmsg.ListGroupsResponseGroup, 0) - return res, nil + for i := range res.Groups { + if s.IsGroupAllowed(res.Groups[i].Group, res.Groups[i].GroupState) { + allowedGroups = append(allowedGroups, res.Groups[i]) + } + } + res.Groups = allowedGroups + groups := &GroupsInfo{ + AllGroupsCount: len(res.Groups), + AllowedGroups: res, + } + s.setCachedItem(keyAllowedGroups, groups, 120*time.Second) + + return groups, nil }) if err != nil { return nil, err } - return res.(*kmsg.ListGroupsResponse), nil + return groups.(*GroupsInfo), nil } func (s *Service) listConsumerGroups(ctx context.Context) (*kmsg.ListGroupsResponse, error) { @@ -53,14 +69,14 @@ func (s *Service) listConsumerGroups(ctx context.Context) (*kmsg.ListGroupsRespo return res, nil } -func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsumerGroupsResponse, error) { +func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsumerGroupsResponse, int, error) { listRes, err := s.listConsumerGroupsCached(ctx) if err != nil { - return nil, err + return nil, -1, err } - groupIDs := make([]string, len(listRes.Groups)) - for i, group := range listRes.Groups { + groupIDs := make([]string, len(listRes.AllowedGroups.Groups)) + for i, group := range listRes.AllowedGroups.Groups { groupIDs[i] = group.Group } @@ -84,6 +100,5 @@ func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsume Groups: res, }) } - - return describedGroups, nil + return describedGroups, listRes.AllGroupsCount, nil } diff --git a/minion/storage.go b/minion/storage.go index c7b5ecc1..10e5c558 100644 --- a/minion/storage.go +++ b/minion/storage.go @@ -108,7 +108,7 @@ func (s *Storage) getNumberOfConsumedRecords() float64 { return s.consumedRecords.Load() } -func (s *Storage) getGroupOffsets() map[string]map[string]map[int32]OffsetCommit { +func (s *Storage) getGroupOffsets(isAllowed func(groupName string, groupState string) bool) map[string]map[string]map[int32]OffsetCommit { // Offsets by group, topic, partition offsetsByGroup := make(map[string]map[string]map[int32]OffsetCommit) @@ -121,6 +121,10 @@ func (s *Storage) getGroupOffsets() map[string]map[string]map[int32]OffsetCommit for _, offset := range offsets { val := offset.(OffsetCommit) + if !isAllowed(val.Key.Group, "") { + continue + } + // Initialize inner maps as necessary if _, exists := offsetsByGroup[val.Key.Group]; !exists { offsetsByGroup[val.Key.Group] = make(map[string]map[int32]OffsetCommit) diff --git a/minion/utils.go b/minion/utils.go index 3049b8ff..9b260017 100644 --- a/minion/utils.go +++ b/minion/utils.go @@ -6,7 +6,7 @@ import ( "strings" ) -func (s *Service) IsGroupAllowed(groupName string) bool { +func (s *Service) IsGroupAllowed(groupName string, groupState string) bool { isAllowed := false for _, regex := range s.AllowedGroupIDsExpr { if regex.MatchString(groupName) { @@ -21,6 +21,15 @@ func (s *Service) IsGroupAllowed(groupName string) bool { break } } + + if isAllowed && groupState != "" { + groupStatesMap := s.Cfg.ConsumerGroups.GetAllowedConsumerGroupStates() + if len(groupStatesMap) > 0 { + if _, ok := groupStatesMap[groupState]; !ok { + isAllowed = false + } + } + } return isAllowed } diff --git a/prometheus/collect_broker_info.go b/prometheus/collect_broker_info.go index 6aa0c701..1a9248d9 100644 --- a/prometheus/collect_broker_info.go +++ b/prometheus/collect_broker_info.go @@ -2,9 +2,10 @@ package prometheus import ( "context" + "strconv" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "strconv" ) func (e *Exporter) collectBrokerInfo(ctx context.Context, ch chan<- prometheus.Metric) bool { diff --git a/prometheus/collect_cluster_info.go b/prometheus/collect_cluster_info.go index 10da5fd0..c9565f62 100644 --- a/prometheus/collect_cluster_info.go +++ b/prometheus/collect_cluster_info.go @@ -2,9 +2,10 @@ package prometheus import ( "context" + "strconv" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "strconv" ) func (e *Exporter) collectClusterInfo(ctx context.Context, ch chan<- prometheus.Metric) bool { diff --git a/prometheus/collect_consumer_group_lags.go b/prometheus/collect_consumer_group_lags.go index b61fd073..5e0c71e3 100644 --- a/prometheus/collect_consumer_group_lags.go +++ b/prometheus/collect_consumer_group_lags.go @@ -2,13 +2,14 @@ package prometheus import ( "context" + "math" + "strconv" + "github.com/cloudhut/kminion/v2/minion" "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" - "math" - "strconv" ) type waterMark struct { @@ -19,6 +20,9 @@ type waterMark struct { } func (e *Exporter) collectConsumerGroupLags(ctx context.Context, ch chan<- prometheus.Metric) bool { + if !e.minionSvc.Cfg.ConsumerGroups.Enabled { + return true + } // Low Watermarks (at the moment they are not needed at all, they could be used to calculate the lag on partitions // that don't have any active offsets) lowWaterMarks, err := e.minionSvc.ListOffsetsCached(ctx, -2) @@ -46,9 +50,6 @@ func (e *Exporter) collectConsumerGroupLags(ctx context.Context, ch chan<- prome func (e *Exporter) collectConsumerGroupLagsOffsetTopic(_ context.Context, ch chan<- prometheus.Metric, marks map[string]map[int32]waterMark) bool { offsets := e.minionSvc.ListAllConsumerGroupOffsetsInternal() for groupName, group := range offsets { - if !e.minionSvc.IsGroupAllowed(groupName) { - continue - } offsetCommits := 0 for topicName, topic := range group { @@ -124,9 +125,6 @@ func (e *Exporter) collectConsumerGroupLagsAdminAPI(ctx context.Context, ch chan groupOffsets, err := e.minionSvc.ListAllConsumerGroupOffsetsAdminAPI(ctx) for groupName, offsetRes := range groupOffsets { - if !e.minionSvc.IsGroupAllowed(groupName) { - continue - } err = kerr.ErrorForCode(offsetRes.ErrorCode) if err != nil { diff --git a/prometheus/collect_consumer_groups.go b/prometheus/collect_consumer_groups.go index 438dae6e..994dcf30 100644 --- a/prometheus/collect_consumer_groups.go +++ b/prometheus/collect_consumer_groups.go @@ -15,7 +15,7 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe if !e.minionSvc.Cfg.ConsumerGroups.Enabled { return true } - groups, err := e.minionSvc.DescribeConsumerGroups(ctx) + groups, allGroups, err := e.minionSvc.DescribeConsumerGroups(ctx) if err != nil { e.logger.Error("failed to collect consumer groups, because Kafka request failed", zap.Error(err)) return false @@ -23,6 +23,7 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe // The list of groups may be incomplete due to group coordinators that might fail to respond. We do log an error // message in that case (in the kafka request method) and groups will not be included in this list. + emptyGroups := 0 for _, grp := range groups { coordinator := grp.BrokerMetadata.NodeID for _, group := range grp.Groups.Groups { @@ -34,31 +35,35 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe ) continue } - if !e.minionSvc.IsGroupAllowed(group.Group) { - continue - } state := 0 if group.State == "Stable" { state = 1 } - ch <- prometheus.MustNewConstMetric( - e.consumerGroupInfo, - prometheus.GaugeValue, - float64(state), - group.Group, - group.Protocol, - group.ProtocolType, - group.State, - strconv.FormatInt(int64(coordinator), 10), - ) + if group.State != "Empty" { + // don't report on empty groups + ch <- prometheus.MustNewConstMetric( + e.consumerGroupInfo, + prometheus.GaugeValue, + float64(state), + group.Group, + group.Protocol, + group.ProtocolType, + group.State, + strconv.FormatInt(int64(coordinator), 10), + ) + } else { + emptyGroups++ + } // total number of members in consumer groups - ch <- prometheus.MustNewConstMetric( - e.consumerGroupMembers, - prometheus.GaugeValue, - float64(len(group.Members)), - group.Group, - ) + if len(group.Members) > 0 { + ch <- prometheus.MustNewConstMetric( + e.consumerGroupMembers, + prometheus.GaugeValue, + float64(len(group.Members)), + group.Group, + ) + } // iterate all members and build two maps: // - {topic -> number-of-consumers} @@ -108,7 +113,7 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe } // number of members with no assignment in a stable consumer group - if membersWithEmptyAssignment > 0 { + if membersWithEmptyAssignment > 0 && group.State == "Stable" { ch <- prometheus.MustNewConstMetric( e.consumerGroupMembersEmpty, prometheus.GaugeValue, @@ -138,6 +143,16 @@ func (e *Exporter) collectConsumerGroups(ctx context.Context, ch chan<- promethe } } } + ch <- prometheus.MustNewConstMetric( + e.consumerGroupInfoAllGroups, + prometheus.GaugeValue, + float64(allGroups), + ) + ch <- prometheus.MustNewConstMetric( + e.consumerGroupInfoEmptyGroups, + prometheus.GaugeValue, + float64(emptyGroups), + ) return true } diff --git a/prometheus/collect_exporter_metrics.go b/prometheus/collect_exporter_metrics.go index 30ecd43d..4daa15fb 100644 --- a/prometheus/collect_exporter_metrics.go +++ b/prometheus/collect_exporter_metrics.go @@ -2,6 +2,7 @@ package prometheus import ( "context" + "github.com/prometheus/client_golang/prometheus" ) diff --git a/prometheus/collect_log_dirs.go b/prometheus/collect_log_dirs.go index 7794f9c1..37e3e737 100644 --- a/prometheus/collect_log_dirs.go +++ b/prometheus/collect_log_dirs.go @@ -2,11 +2,12 @@ package prometheus import ( "context" + "strconv" + "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" - "strconv" ) func (e *Exporter) collectLogDirs(ctx context.Context, ch chan<- prometheus.Metric) bool { diff --git a/prometheus/collect_topic_info.go b/prometheus/collect_topic_info.go index 7474ec16..9ece4c43 100644 --- a/prometheus/collect_topic_info.go +++ b/prometheus/collect_topic_info.go @@ -79,6 +79,38 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me float64(1), labelsValues..., ) + ch <- prometheus.MustNewConstMetric( + e.topicInfoPartitionsCount, + prometheus.GaugeValue, + float64(partitionCount), + *topic.Topic, + ) + ch <- prometheus.MustNewConstMetric( + e.topicInfoReplicationFactor, + prometheus.GaugeValue, + float64(replicationFactor), + *topic.Topic, + ) + if parameter, exists := configsByTopic[*topic.Topic]["min.insync.replicas"]; exists { + if value, err := strconv.ParseFloat(parameter, 64); err == nil { + ch <- prometheus.MustNewConstMetric( + e.topicInfoMinInsyncReplicas, + prometheus.GaugeValue, + value, + *topic.Topic, + ) + } + } + if parameter, exists := configsByTopic[*topic.Topic]["retention.ms"]; exists { + if value, err := strconv.ParseFloat(parameter, 64); err == nil { + ch <- prometheus.MustNewConstMetric( + e.topicInfoRetentionMs, + prometheus.GaugeValue, + value, + *topic.Topic, + ) + } + } } return isOk } diff --git a/prometheus/collect_topic_partition_offsets.go b/prometheus/collect_topic_partition_offsets.go index 993ab72e..6c27fcfa 100644 --- a/prometheus/collect_topic_partition_offsets.go +++ b/prometheus/collect_topic_partition_offsets.go @@ -2,11 +2,12 @@ package prometheus import ( "context" + "strconv" + "github.com/cloudhut/kminion/v2/minion" "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kerr" "go.uber.org/zap" - "strconv" ) func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- prometheus.Metric) bool { @@ -25,6 +26,15 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p return false } + // Highest Timestamp Offsets + // NB: this requires Kafka Brokers 3.0+ (see https://issues.apache.org/jira/browse/KAFKA-12541) + // In older versions this is returning the timestamp of the low watermarks (earliest offset) + maxTimestampOffsets, err := e.minionSvc.ListOffsetsCached(ctx, -3) + if err != nil { + e.logger.Error("failed to fetch offsets for max timestamp", zap.Error(err)) + return false + } + // Process Low Watermarks for _, topic := range lowWaterMarks.Topics { if !e.minionSvc.IsTopicAllowed(topic.Topic) { @@ -101,5 +111,47 @@ func (e *Exporter) collectTopicPartitionOffsets(ctx context.Context, ch chan<- p } } + // Process Max Timestamps + for _, topic := range maxTimestampOffsets.Topics { + if !e.minionSvc.IsTopicAllowed(topic.Topic) { + continue + } + topicMaxTimestamp := int64(0) + hasErrors := false + for _, partition := range topic.Partitions { + err := kerr.ErrorForCode(partition.ErrorCode) + if err != nil { + hasErrors = true + isOk = false + continue + } + if topicMaxTimestamp < partition.Timestamp { + topicMaxTimestamp = partition.Timestamp + } + // Let's end here if partition metrics shall not be exposed + if e.minionSvc.Cfg.Topics.Granularity == minion.TopicGranularityTopic { + continue + } + if partition.Timestamp > 0 { + ch <- prometheus.MustNewConstMetric( + e.partitionMaxTimestamp, + prometheus.GaugeValue, + float64(partition.Timestamp), + topic.Topic, + strconv.Itoa(int(partition.Partition)), + ) + } + } + // We only want to report the max of all partition max timestamps if we receive results from all partitions + // and the topic is not empty + if !hasErrors && topicMaxTimestamp > 0 { + ch <- prometheus.MustNewConstMetric( + e.topicMaxTimestamp, + prometheus.GaugeValue, + float64(topicMaxTimestamp), + topic.Topic, + ) + } + } return isOk } diff --git a/prometheus/exporter.go b/prometheus/exporter.go index d717bcfa..38f4de2c 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -32,14 +32,22 @@ type Exporter struct { topicLogDirSize *prometheus.Desc // Topic / Partition - topicInfo *prometheus.Desc - topicHighWaterMarkSum *prometheus.Desc - partitionHighWaterMark *prometheus.Desc - topicLowWaterMarkSum *prometheus.Desc - partitionLowWaterMark *prometheus.Desc + topicInfo *prometheus.Desc + topicInfoPartitionsCount *prometheus.Desc + topicInfoReplicationFactor *prometheus.Desc + topicInfoMinInsyncReplicas *prometheus.Desc + topicInfoRetentionMs *prometheus.Desc + topicHighWaterMarkSum *prometheus.Desc + partitionHighWaterMark *prometheus.Desc + topicLowWaterMarkSum *prometheus.Desc + partitionLowWaterMark *prometheus.Desc + topicMaxTimestamp *prometheus.Desc + partitionMaxTimestamp *prometheus.Desc // Consumer Groups consumerGroupInfo *prometheus.Desc + consumerGroupInfoAllGroups *prometheus.Desc + consumerGroupInfoEmptyGroups *prometheus.Desc consumerGroupMembers *prometheus.Desc consumerGroupMembersEmpty *prometheus.Desc consumerGroupTopicMembers *prometheus.Desc @@ -114,6 +122,30 @@ func (e *Exporter) InitializeMetrics() { labels, nil, ) + e.topicInfoPartitionsCount = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_partitions_count"), + "Partitions configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoReplicationFactor = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_replication_factor"), + "Replication factor configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoMinInsyncReplicas = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_min_insync_replicas"), + "Min in-sync replicas configuration for a given topic", + []string{"topic_name"}, + nil, + ) + e.topicInfoRetentionMs = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info_retention_ms"), + "Retention time for a given topic", + []string{"topic_name"}, + nil, + ) // Partition Low Water Mark e.partitionLowWaterMark = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_low_water_mark"), @@ -142,6 +174,20 @@ func (e *Exporter) InitializeMetrics() { []string{"topic_name"}, nil, ) + // Partition Max Timestamp + e.partitionMaxTimestamp = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_partition_max_timestamp"), + "Partition Max Timestamp", + []string{"topic_name", "partition_id"}, + nil, + ) + // Topic Max Timestamp + e.topicMaxTimestamp = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_max_timestamp"), + "Topic Max Timestamp", + []string{"topic_name"}, + nil, + ) // Consumer Group Metrics // Group Info @@ -151,6 +197,20 @@ func (e *Exporter) InitializeMetrics() { []string{"group_id", "protocol", "protocol_type", "state", "coordinator_id"}, nil, ) + // Group Info - All Groups + e.consumerGroupInfoAllGroups = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_info_count"), + "Consumer Group info metrics. It will report the number of all groups in cluster.", + nil, + nil, + ) + // Group Info - Empty Groups + e.consumerGroupInfoEmptyGroups = prometheus.NewDesc( + prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_info_empty_groups"), + "Consumer Group info metrics. It will report the number of empty groups overall in cluster.", + nil, + nil, + ) // Group Members e.consumerGroupMembers = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "consumer_group_members"), @@ -221,7 +281,7 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) { } func (e *Exporter) Collect(ch chan<- prometheus.Metric) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*180) defer cancel() // Attach a unique id which will be used for caching (and and it's invalidation) of the kafka requests