Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cmd/topicmappr/commands/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/DataDog/kafka-kit/v4/kafkazk"
"github.com/DataDog/kafka-kit/v4/kafkaadmin"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -93,6 +94,30 @@ func initZooKeeper(zkAddr, kafkaPrefix, metricsPrefix string) (kafkazk.Handler,
return zk, nil
}

func newKafkaAdminClient(cmd *cobra.Command) (kafkaadmin.KafkaAdmin, error) {
cfg := kafkaadmin.Config{
BootstrapServers: cmd.Parent().Flag("kafka-addr").Value.String(),
}

if flag := cmd.Parent().Flag("kafka-ssl-ca-location"); flag.Changed {
cfg.SSLCALocation = flag.Value.String()
}
if flag := cmd.Parent().Flag("kafka-security-protocol"); flag.Changed {
cfg.SecurityProtocol = flag.Value.String()
}
if flag := cmd.Parent().Flag("kafka-sasl-mechanism"); flag.Changed {
cfg.SASLMechanism = flag.Value.String()
}
if flag := cmd.Parent().Flag("kafka-sasl-username"); flag.Changed {
cfg.SASLUsername = flag.Value.String()
}
if flag := cmd.Parent().Flag("kafka-sasl-password"); flag.Changed {
cfg.SASLPassword = flag.Value.String()
}

return kafkaadmin.NewClient(cfg)
}

// containsRegex takes a topic name reference and returns whether or not
// it should be interpreted as regex.
func containsRegex(t string) bool {
Expand Down
4 changes: 1 addition & 3 deletions cmd/topicmappr/commands/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -57,8 +56,7 @@ func rebalance(cmd *cobra.Command, _ []string) {
defer zk.Close()

// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
ka, err := newKafkaAdminClient(cmd)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
4 changes: 1 addition & 3 deletions cmd/topicmappr/commands/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"regexp"
"strings"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"
"github.com/DataDog/kafka-kit/v4/kafkazk"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -153,8 +152,7 @@ func rebuild(cmd *cobra.Command, _ []string) {
}

// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
ka, err := newKafkaAdminClient(cmd)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
7 changes: 7 additions & 0 deletions cmd/topicmappr/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ func Execute() {

func init() {
rootCmd.PersistentFlags().String("kafka-addr", "localhost:9092", "Kafka bootstrap address")
rootCmd.PersistentFlags().String("kafka-ssl-ca-location", "/etc/kafka/config/ca.crt", "Kafka ssl ca location")
rootCmd.PersistentFlags().String("kafka-security-protocol", "SASL_SSL", "Kafka security protocol")
rootCmd.PersistentFlags().String("kafka-sasl-mechanism", "PLAIN", "Kafka sasl mechanism")
rootCmd.PersistentFlags().String("kafka-sasl-username", "", "Kafka sasl username")
rootCmd.PersistentFlags().String("kafka-sasl-password", "", "Kafka sasl password")

rootCmd.PersistentFlags().String("zk-addr", "localhost:2181", "ZooKeeper connect string")
rootCmd.PersistentFlags().String("zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)")
rootCmd.PersistentFlags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics")

rootCmd.PersistentFlags().Bool("ignore-warns", false, "Produce a map even if warnings are encountered")
}
5 changes: 1 addition & 4 deletions cmd/topicmappr/commands/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"os"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"

"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -55,8 +53,7 @@ func scale(cmd *cobra.Command, _ []string) {
defer zk.Close()

// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
ka, err := newKafkaAdminClient(cmd)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -248,6 +249,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down