From e3f15e8fbb6ff629cd3c186f02851f3aee0d6eb7 Mon Sep 17 00:00:00 2001 From: joerinehart Date: Wed, 21 May 2025 10:24:57 -0400 Subject: [PATCH 1/5] DVRL-487 - zero-config Iceberg quickstart --- .../cmd/producer/producer.go | 2 +- .../iceberg-quickstart/config/bufstream.yaml | 11 +- bufstream/iceberg-quickstart/go.mod | 8 +- bufstream/iceberg-quickstart/go.sum | 26 +-- bufstream/iceberg-quickstart/pkg/app/app.go | 186 +++++++++++++----- .../iceberg-quickstart/pkg/kafka/kafka.go | 7 +- 6 files changed, 171 insertions(+), 69 deletions(-) diff --git a/bufstream/iceberg-quickstart/cmd/producer/producer.go b/bufstream/iceberg-quickstart/cmd/producer/producer.go index b0cd78a..61d4efe 100644 --- a/bufstream/iceberg-quickstart/cmd/producer/producer.go +++ b/bufstream/iceberg-quickstart/cmd/producer/producer.go @@ -31,7 +31,7 @@ import ( func main() { // See the app package for the boilerplate we use to set up the producer and // consumer, including bound flags. - app.Main(run) + app.MainAutoCreateTopic(run) } func run(ctx context.Context, config app.Config) error { diff --git a/bufstream/iceberg-quickstart/config/bufstream.yaml b/bufstream/iceberg-quickstart/config/bufstream.yaml index 5a2b7eb..a736dcd 100644 --- a/bufstream/iceberg-quickstart/config/bufstream.yaml +++ b/bufstream/iceberg-quickstart/config/bufstream.yaml @@ -43,8 +43,9 @@ storage: string: admin secret_access_key: string: password -#iceberg: -# catalogs: -# - name: local-rest-catalog -# rest: -# url: http://iceberg-rest:8181 + +iceberg: + catalogs: + - name: local-rest-catalog + rest: + url: http://iceberg-rest:8181 diff --git a/bufstream/iceberg-quickstart/go.mod b/bufstream/iceberg-quickstart/go.mod index a0abbdf..0318b16 100644 --- a/bufstream/iceberg-quickstart/go.mod +++ b/bufstream/iceberg-quickstart/go.mod @@ -9,7 +9,8 @@ require ( github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 github.com/google/uuid v1.6.0 github.com/spf13/pflag v1.0.5 - github.com/twmb/franz-go v1.18.0 + github.com/twmb/franz-go v1.18.1 + github.com/twmb/franz-go/pkg/kadm v1.16.0 google.golang.org/protobuf v1.36.6 ) @@ -17,9 +18,10 @@ require ( github.com/bufbuild/protocompile v0.14.1 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/jhump/protoreflect v1.17.0 // indirect - github.com/klauspost/compress v1.17.10 // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect + golang.org/x/crypto v0.32.0 // indirect golang.org/x/sync v0.8.0 // indirect google.golang.org/genproto v0.0.0-20240924160255-9d4c2d233b61 // indirect ) diff --git a/bufstream/iceberg-quickstart/go.sum b/bufstream/iceberg-quickstart/go.sum index 29fdb90..015a0e3 100644 --- a/bufstream/iceberg-quickstart/go.sum +++ b/bufstream/iceberg-quickstart/go.sum @@ -111,16 +111,16 @@ github.com/hashicorp/vault/api v1.15.0 h1:O24FYQCWwhwKnF7CuSqP30S51rTV7vz1iACXE/ github.com/hashicorp/vault/api v1.15.0/go.mod h1:+5YTO09JGn0u+b6ySD/LLVf8WkJCPLAL2Vkmrn2+CM8= github.com/jhump/protoreflect v1.17.0 h1:qOEr613fac2lOuTgWN4tPAtLL7fUSbuJL5X5XumQh94= github.com/jhump/protoreflect v1.17.0/go.mod h1:h9+vUUL38jiBzck8ck+6G/aeMX8Z4QUY/NiJPwPNi+8= -github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= -github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -141,8 +141,10 @@ github.com/tink-crypto/tink-go-hcvault/v2 v2.1.0 h1:REG5YX2omhgPmiIT7GLqmzWFnIks github.com/tink-crypto/tink-go-hcvault/v2 v2.1.0/go.mod h1:OJLS+EYJo/BTViJj7EBG5deKLeQfYwVNW8HMS1qHAAo= github.com/tink-crypto/tink-go/v2 v2.1.0 h1:QXFBguwMwTIaU17EgZpEJWsUSc60b1BAGTzBIoMdmok= github.com/tink-crypto/tink-go/v2 v2.1.0/go.mod h1:y1TnYFt1i2eZVfx4OGc+C+EMp4CoKWAw2VSEuoicHHI= -github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= -github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= +github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= +github.com/twmb/franz-go/pkg/kadm v1.16.0 h1:STMs1t5lYR5mR974PSiwNzE5TvsosByTp+rKXLOhAjE= +github.com/twmb/franz-go/pkg/kadm v1.16.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= github.com/xiatechs/jsonata-go v1.8.5 h1:m1NaokPKD6LPaTPRl674EQz5mpkJvM3ymjdReDEP6/A= @@ -157,8 +159,8 @@ go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGX go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o= golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= @@ -167,10 +169,10 @@ golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI= golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= google.golang.org/api v0.169.0 h1:QwWPy71FgMWqJN/l6jVlFHUa29a7dcUy02I8o799nPY= diff --git a/bufstream/iceberg-quickstart/pkg/app/app.go b/bufstream/iceberg-quickstart/pkg/app/app.go index e1d2d65..4e5300d 100644 --- a/bufstream/iceberg-quickstart/pkg/app/app.go +++ b/bufstream/iceberg-quickstart/pkg/app/app.go @@ -20,13 +20,26 @@ package app import ( "context" - "github.com/bufbuild/buf-examples/bufstream/iceberg-quickstart/pkg/csr" - "github.com/bufbuild/buf-examples/bufstream/iceberg-quickstart/pkg/kafka" + "errors" + "fmt" "log/slog" "os" "os/signal" + "strings" + "github.com/bufbuild/buf-examples/bufstream/iceberg-quickstart/pkg/csr" + "github.com/bufbuild/buf-examples/bufstream/iceberg-quickstart/pkg/kafka" "github.com/spf13/pflag" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" +) + +const ( + defaultKafkaClientID = "bufstream-demo" +) + +var ( + defaultKafkaBootstrapServers = []string{"localhost:9092"} ) // Config contains all application configuration needed by the producer and consumer. @@ -35,94 +48,124 @@ type Config struct { CSR csr.Config } -// Main is used by the producer and consumer within their main functions. +// Main is used by the consumer's main function. // // It sets up logging, interrupt handling, and binds and parses all flags. Afterwards, it calls -// do to invoke the application logic. -func Main(do func(context.Context, Config) error) { +// action to invoke the application logic. +func Main(action func(context.Context, Config) error) { + doMain(false, action) +} + +// MainAutoCreateTopic is used by the producer's main function. It is just like [Main] except +// that it will also create the topic if necessary. The producer defines the topic and provides +// the data, so the consumer should not be the one auto-creating it. +// +// Note that in a real production workload, neither producer nor consumer applications should +// ever create topics. This should be considered an infrastructure concern, and the topic +// should be provisioned with correct configuration before a producer ever tries to send +// messages to it. If the topic does not exist, this should be a failure in the producer +// since it means a likely misconfiguration. +// +// This demo workload creates the topic, despite it not being a typical good practice, just +// for simplicity, so there are fewer steps to get the demo running. +func MainAutoCreateTopic(action func(context.Context, Config) error) { + doMain(true, action) +} + +func doMain(autoCreateTopic bool, action func(context.Context, Config) error) { // Set up slog. We use the global logger throughout this demo. // Set up slog. We use the global logger throughout this demo. slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))) // Cancel the context on interrupt, i.e. ctrl+c for our purposes. ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() - if err := run(ctx, do); err != nil { + if err := run(ctx, autoCreateTopic, action); err != nil { slog.Error("program error", "error", err) os.Exit(1) } } -func run(ctx context.Context, do func(context.Context, Config) error) error { - config, err := parseConfig() +func run(ctx context.Context, autoCreateTopic bool, action func(context.Context, Config) error) error { + config, err := parseConfig(autoCreateTopic) if err != nil { return err } - return do(ctx, config) + if autoCreateTopic { + if err := maybeCreateTopic(ctx, config.Kafka); err != nil { + return err + } + } + return action(ctx, config) } -func parseConfig() (Config, error) { +func parseConfig(canCreateTopic bool) (Config, error) { flagSet := pflag.NewFlagSet(os.Args[0], pflag.ContinueOnError) - config := Config{ - Kafka: kafka.Config{ - BootstrapServers: []string{"0.0.0.0:9092"}, - RootCAPath: "", - Topic: "email-updated", - Group: "email-verifier", - ClientID: "bufstream-demo", - }, - CSR: csr.Config{ - URL: "", - Username: "", - Password: "", - }, - } - - flagSet.StringArrayVarP( + config := Config{} + flagSet.StringArrayVar( &config.Kafka.BootstrapServers, "bootstrap", - "b", - config.Kafka.BootstrapServers, + defaultKafkaBootstrapServers, "The Bufstream bootstrap server addresses.", ) - flagSet.StringVarP( + flagSet.StringVar( + &config.Kafka.ClientID, + "client-id", + defaultKafkaClientID, + "The Kafka client ID.", + ) + flagSet.StringVar( &config.Kafka.Topic, "topic", - "t", - config.Kafka.Topic, + "", "The Kafka topic name to use.", ) - flagSet.StringVarP( + if canCreateTopic { + flagSet.BoolVar( + &config.Kafka.RecreateTopic, + "recreate-topic", + false, + "If true, the topic will be recreated even if it already exists.", + ) + flagSet.IntVar( + &config.Kafka.TopicPartitions, + "topic-partitions", + 1, + "The number of partitions to use when creating the topic.", + ) + flagSet.StringSliceVar( + &config.Kafka.TopicConfig, + "topic-config", + nil, + "Topic config parameters to use when creating the topic.", + ) + } + flagSet.StringVar( &config.Kafka.Group, "group", - "g", - config.Kafka.Group, + "", "The Kafka consumer group ID.", ) - flagSet.StringVarP( + flagSet.StringVar( &config.CSR.URL, "csr-url", - "c", - config.CSR.URL, + "", "The Confluent Schema Registry URL.", ) - flagSet.StringVarP( + flagSet.StringVar( &config.CSR.Username, "csr-user", - "u", - config.CSR.Username, + "", "The Confluent Schema Registry username, if authentication is needed.", ) - flagSet.StringVarP( + flagSet.StringVar( &config.CSR.Password, "csr-pass", - "p", - config.CSR.Password, + "", "The Confluent Schema Registry password/token, if authentication is needed.", ) - flagSet.StringVarP( + flagSet.StringVar( &config.Kafka.RootCAPath, "tls-root-ca-path", "", - config.Kafka.RootCAPath, "A path to root CA certificate for kafka TLS.", ) if err := flagSet.Parse(os.Args[1:]); err != nil { @@ -130,3 +173,56 @@ func parseConfig() (Config, error) { } return config, nil } + +func maybeCreateTopic(ctx context.Context, config kafka.Config) error { + client, err := kafka.NewKafkaClient(config, false) + if err != nil { + return err + } + defer client.Close() + + admClient := kadm.NewClient(client) + if config.RecreateTopic { + resp, err := admClient.DeleteTopic(ctx, config.Topic) + if err == nil { + err = resp.Err + } + if !isUnknownTopic(err) { + return err // something went wrong + } + } else { + resp, err := admClient.DescribeTopicConfigs(ctx, config.Topic) + if err == nil { + if len(resp) != 1 { + return fmt.Errorf("expected 1 topic config, got %d", len(resp)) + } + err = resp[0].Err + } + if err == nil { + return nil // topic exists; nothing to create + } + if !isUnknownTopic(err) { + return err // something went wrong + } + // Else, topic does not exist, so we fall through to create it. + } + configs := make(map[string]*string, len(config.TopicConfig)) + for _, conf := range config.TopicConfig { + k, v, _ := strings.Cut(conf, "=") + if v == "" { + configs[k] = nil + } else { + configs[k] = &v + } + } + resp, err := admClient.CreateTopic(ctx, int32(config.TopicPartitions), 1, configs, config.Topic) + if err == nil { + err = resp.Err + } + return err +} + +func isUnknownTopic(err error) bool { + var kError *kerr.Error + return errors.As(err, &kError) && kError.Code == kerr.UnknownTopicOrPartition.Code +} diff --git a/bufstream/iceberg-quickstart/pkg/kafka/kafka.go b/bufstream/iceberg-quickstart/pkg/kafka/kafka.go index f29a6e0..a204ad6 100644 --- a/bufstream/iceberg-quickstart/pkg/kafka/kafka.go +++ b/bufstream/iceberg-quickstart/pkg/kafka/kafka.go @@ -32,12 +32,14 @@ import ( // franz-go in production code, we'd recommend using the functional options directly. type Config struct { // BootstrapServers are the bootstrap servers to call. - // BootstrapServers []string RootCAPath string Group string - Topic string ClientID string + Topic string + RecreateTopic bool + TopicConfig []string + TopicPartitions int } // NewKafkaClient returns a new franz-go Kafka Client for the given Config. @@ -45,7 +47,6 @@ func NewKafkaClient(config Config, consumer bool) (*kgo.Client, error) { opts := []kgo.Opt{ kgo.SeedBrokers(config.BootstrapServers...), kgo.ClientID(config.ClientID), - kgo.AllowAutoTopicCreation(), } if consumer { From e507e379360b909c1086821ca98c3f35e2783e15 Mon Sep 17 00:00:00 2001 From: joerinehart Date: Wed, 21 May 2025 10:36:56 -0400 Subject: [PATCH 2/5] DVRL-487 - checkpoint before testing against docs update --- .../iceberg-quickstart/docker-compose.yml | 31 ------------------- bufstream/iceberg-quickstart/pkg/app/app.go | 2 ++ 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/bufstream/iceberg-quickstart/docker-compose.yml b/bufstream/iceberg-quickstart/docker-compose.yml index 25aaece..0c7f912 100644 --- a/bufstream/iceberg-quickstart/docker-compose.yml +++ b/bufstream/iceberg-quickstart/docker-compose.yml @@ -91,37 +91,6 @@ services: "--config", "/bufstream.yaml", ] - # A GUI for Bufstream's Kafka broker. See https://akhq.io - # - # Browse to http://localhost:8282 on your machine. - akhq: - image: tchiotludo/akhq:0.25.0 - container_name: akhq - stop_signal: SIGKILL - networks: - iceberg_net: - depends_on: - - bufstream - healthcheck: - test: nc -z akhq 8080 || exit -1 - start_period: 15s - interval: 5s - timeout: 10s - retries: 10 - ports: - - "8282:8080" - environment: - AKHQ_CONFIGURATION: | - akhq: - connections: - bufstream-local: - properties: - bootstrap.servers: "bufstream:9092" - client.id: "akhq;broker_count=1;host_override=bufstream" - schema-registry: - url: "https://demo.buf.dev/integrations/confluent/bufstream-demo" - type: "confluent" - # Spark, relying on MinIO and the REST Iceberg catalog. spark-iceberg: image: tabulario/spark-iceberg diff --git a/bufstream/iceberg-quickstart/pkg/app/app.go b/bufstream/iceberg-quickstart/pkg/app/app.go index 4e5300d..b11e8d9 100644 --- a/bufstream/iceberg-quickstart/pkg/app/app.go +++ b/bufstream/iceberg-quickstart/pkg/app/app.go @@ -206,6 +206,7 @@ func maybeCreateTopic(ctx context.Context, config kafka.Config) error { } // Else, topic does not exist, so we fall through to create it. } + slog.Info("Creating topic", "name", config.Topic) configs := make(map[string]*string, len(config.TopicConfig)) for _, conf := range config.TopicConfig { k, v, _ := strings.Cut(conf, "=") @@ -214,6 +215,7 @@ func maybeCreateTopic(ctx context.Context, config kafka.Config) error { } else { configs[k] = &v } + slog.Info("Configuring topic", "name", config.Topic, "parameter", k, "value", v) } resp, err := admClient.CreateTopic(ctx, int32(config.TopicPartitions), 1, configs, config.Topic) if err == nil { From d7de95d4c13d8fafcf43b93301efb00a923f9d6f Mon Sep 17 00:00:00 2001 From: joerinehart Date: Wed, 21 May 2025 10:57:04 -0400 Subject: [PATCH 3/5] DVRL-487 - improving log output --- bufstream/iceberg-quickstart/pkg/app/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bufstream/iceberg-quickstart/pkg/app/app.go b/bufstream/iceberg-quickstart/pkg/app/app.go index b11e8d9..2a517fc 100644 --- a/bufstream/iceberg-quickstart/pkg/app/app.go +++ b/bufstream/iceberg-quickstart/pkg/app/app.go @@ -215,7 +215,7 @@ func maybeCreateTopic(ctx context.Context, config kafka.Config) error { } else { configs[k] = &v } - slog.Info("Configuring topic", "name", config.Topic, "parameter", k, "value", v) + slog.Info("Configuring topic", "topic", config.Topic, "parameter", k, "value", v) } resp, err := admClient.CreateTopic(ctx, int32(config.TopicPartitions), 1, configs, config.Topic) if err == nil { From 03937bff1bbc0ac2b86eaf2b0305fb6eb1e75027 Mon Sep 17 00:00:00 2001 From: joerinehart Date: Wed, 21 May 2025 11:14:47 -0400 Subject: [PATCH 4/5] DVRL-487 - remove extra linefeeds --- bufstream/iceberg-quickstart/config/bufstream.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/bufstream/iceberg-quickstart/config/bufstream.yaml b/bufstream/iceberg-quickstart/config/bufstream.yaml index a736dcd..c2c96aa 100644 --- a/bufstream/iceberg-quickstart/config/bufstream.yaml +++ b/bufstream/iceberg-quickstart/config/bufstream.yaml @@ -33,7 +33,6 @@ data_enforcement: on_parse_error: FILTER_RECORD redaction: debug_redact: true - storage: provider: S3 region: us-east-1 @@ -43,7 +42,6 @@ storage: string: admin secret_access_key: string: password - iceberg: catalogs: - name: local-rest-catalog From 8bd7e3728f0e99aa17d62c43d639d5daf523cec4 Mon Sep 17 00:00:00 2001 From: joerinehart Date: Wed, 21 May 2025 12:11:46 -0400 Subject: [PATCH 5/5] DVRL-487 - remove unused BUFSTREAM_VERSION from Makefile --- bufstream/iceberg-quickstart/Makefile | 2 -- 1 file changed, 2 deletions(-) diff --git a/bufstream/iceberg-quickstart/Makefile b/bufstream/iceberg-quickstart/Makefile index aaa1e5c..34fe9f4 100644 --- a/bufstream/iceberg-quickstart/Makefile +++ b/bufstream/iceberg-quickstart/Makefile @@ -9,8 +9,6 @@ SHELL := bash MAKEFLAGS += --warn-undefined-variables MAKEFLAGS += --no-builtin-rules -BUFSTREAM_VERSION := 0.3.27 - .PHONY: ci ci: format-proto lint-proto