-
Notifications
You must be signed in to change notification settings - Fork 114
kafka reporter sendBatch #148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,8 @@ import ( | |
| "encoding/json" | ||
| "log" | ||
| "os" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/Shopify/sarama" | ||
| "github.com/openzipkin/zipkin-go/model" | ||
|
|
@@ -30,15 +32,31 @@ import ( | |
| // defaultKafkaTopic sets the standard Kafka topic our Reporter will publish | ||
| // on. The default topic for zipkin-receiver-kafka is "zipkin", see: | ||
| // https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka | ||
| const defaultKafkaTopic = "zipkin" | ||
|
|
||
| // defaults | ||
| const ( | ||
| defaultBatchInterval = time.Second * 1 // BatchInterval in seconds | ||
| defaultBatchSize = 100 | ||
| defaultMaxBacklog = 1000 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jeqo are these good defaults?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. iiuc we are batching 100, and max size is 1000 bytes (?). We have these defaults in java reporter: https://github.com/openzipkin/zipkin-reporter-java/blob/3f466e56012384ee685dd5cc91012129d312289b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L81-L90 max request size is 1MB (default in kafka producer) and but somewhere we've discuss to reduce it to have and avoid too big batches, so 500KB should be a good default. Make sure to align producer properties to reporter ones, similar to https://github.com/openzipkin/zipkin-reporter-java/blob/3f466e56012384ee685dd5cc91012129d312289b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L131-L140
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jeqo similar to http reporter: Max request size is also 1MB in sarama kafka producer: https://github.com/Shopify/sarama/blob/master/config.go#L411 |
||
| defaultKafkaTopic = "zipkin" | ||
| ) | ||
|
|
||
| // kafkaReporter implements Reporter by publishing spans to a Kafka | ||
| // broker. | ||
| type kafkaReporter struct { | ||
| producer sarama.AsyncProducer | ||
| logger *log.Logger | ||
| topic string | ||
| serializer reporter.SpanSerializer | ||
| producer sarama.AsyncProducer | ||
| logger *log.Logger | ||
| topic string | ||
| serializer reporter.SpanSerializer | ||
| batchInterval time.Duration | ||
| batchSize int | ||
| maxBacklog int | ||
| batchMtx *sync.Mutex | ||
| batch []*model.SpanModel | ||
| spanC chan *model.SpanModel | ||
| sendC chan struct{} | ||
| quit chan struct{} | ||
| shutdown chan error | ||
| } | ||
|
|
||
| // ReporterOption sets a parameter for the kafkaReporter | ||
|
|
@@ -59,6 +77,25 @@ func Producer(p sarama.AsyncProducer) ReporterOption { | |
| } | ||
| } | ||
|
|
||
| // BatchSize sets the maximum batch size, after which a collect will be | ||
| // triggered. The default batch size is 100 traces. | ||
| func BatchSize(n int) ReporterOption { | ||
| return func(r *kafkaReporter) { r.batchSize = n } | ||
| } | ||
|
|
||
| // BatchInterval sets the maximum duration we will buffer traces before | ||
| // emitting them to the collector. The default batch interval is 1 second. | ||
| func BatchInterval(d time.Duration) ReporterOption { | ||
| return func(r *kafkaReporter) { r.batchInterval = d } | ||
| } | ||
|
|
||
| // MaxBacklog sets the maximum backlog size. When batch size reaches this | ||
| // threshold, spans from the beginning of the batch will be disposed. | ||
| func MaxBacklog(n int) ReporterOption { | ||
| return func(r *kafkaReporter) { r.maxBacklog = n } | ||
| } | ||
|
|
||
| // Topic sets the kafka topic to attach the reporter producer on. | ||
|
||
| // Topic sets the kafka topic to attach the reporter producer on. | ||
| func Topic(t string) ReporterOption { | ||
| return func(c *kafkaReporter) { | ||
|
|
@@ -80,9 +117,18 @@ func Serializer(serializer reporter.SpanSerializer) ReporterOption { | |
| // TCP endpoints of the form "host:port". | ||
| func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) { | ||
| r := &kafkaReporter{ | ||
| logger: log.New(os.Stderr, "", log.LstdFlags), | ||
| topic: defaultKafkaTopic, | ||
| serializer: reporter.JSONSerializer{}, | ||
| logger: log.New(os.Stderr, "", log.LstdFlags), | ||
| topic: defaultKafkaTopic, | ||
| serializer: reporter.JSONSerializer{}, | ||
| batchInterval: defaultBatchInterval, | ||
| batchSize: defaultBatchSize, | ||
| maxBacklog: defaultMaxBacklog, | ||
| batch: []*model.SpanModel{}, | ||
| spanC: make(chan *model.SpanModel), | ||
| sendC: make(chan struct{}, 1), | ||
| quit: make(chan struct{}, 1), | ||
| shutdown: make(chan error, 1), | ||
| batchMtx: &sync.Mutex{}, | ||
| } | ||
|
|
||
| for _, option := range options { | ||
|
|
@@ -96,6 +142,8 @@ func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter | |
| r.producer = p | ||
| } | ||
|
|
||
| go r.loop() | ||
| go r.sendLoop() | ||
| go r.logErrors() | ||
|
|
||
| return r, nil | ||
|
|
@@ -108,21 +156,99 @@ func (r *kafkaReporter) logErrors() { | |
| } | ||
|
|
||
| func (r *kafkaReporter) Send(s model.SpanModel) { | ||
| r.spanC <- &s | ||
| } | ||
|
|
||
| func (r *kafkaReporter) Close() error { | ||
| close(r.quit) | ||
| <-r.shutdown | ||
| return r.producer.Close() | ||
| } | ||
|
|
||
| func (r *kafkaReporter) loop() { | ||
| var ( | ||
| nextSend = time.Now().Add(r.batchInterval) | ||
| ticker = time.NewTicker(r.batchInterval / 10) | ||
| tickerChan = ticker.C | ||
| ) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case span := <-r.spanC: | ||
| currentBatchSize := r.append(span) | ||
| if currentBatchSize >= r.batchSize { | ||
| nextSend = time.Now().Add(r.batchInterval) | ||
| r.enqueueSend() | ||
| } | ||
| case <-tickerChan: | ||
| if time.Now().After(nextSend) { | ||
| nextSend = time.Now().Add(r.batchInterval) | ||
| r.enqueueSend() | ||
| } | ||
| case <-r.quit: | ||
| close(r.sendC) | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (r *kafkaReporter) sendLoop() { | ||
| for range r.sendC { | ||
| _ = r.sendBatch() | ||
| } | ||
| r.shutdown <- r.sendBatch() | ||
| } | ||
|
|
||
| func (r *kafkaReporter) enqueueSend() { | ||
| select { | ||
| case r.sendC <- struct{}{}: | ||
| default: | ||
| // Do nothing if there's a pending send request already | ||
| } | ||
| } | ||
|
|
||
| func (r *kafkaReporter) sendBatch() error { | ||
| // Zipkin expects the message to be wrapped in an array | ||
| ss := []model.SpanModel{s} | ||
| m, err := json.Marshal(ss) | ||
|
|
||
| // Select all current spans in the batch to be sent | ||
| r.batchMtx.Lock() | ||
| sendBatch := r.batch[:] | ||
| r.batchMtx.Unlock() | ||
|
|
||
| if len(sendBatch) == 0 { | ||
| return nil | ||
| } | ||
| m, err := json.Marshal(sendBatch) | ||
| if err != nil { | ||
| r.logger.Printf("failed when marshalling the span: %s\n", err.Error()) | ||
| return | ||
| return err | ||
| } | ||
|
|
||
| r.producer.Input() <- &sarama.ProducerMessage{ | ||
| Topic: r.topic, | ||
| Key: nil, | ||
| Value: sarama.ByteEncoder(m), | ||
| } | ||
|
|
||
| // Remove sent spans from the batch even if they were not saved | ||
| r.batchMtx.Lock() | ||
| r.batch = r.batch[len(sendBatch):] | ||
| r.batchMtx.Unlock() | ||
| return nil | ||
| } | ||
|
|
||
| func (r *kafkaReporter) Close() error { | ||
| return r.producer.Close() | ||
| func (r *kafkaReporter) append(span *model.SpanModel) (newBatchSize int) { | ||
| r.batchMtx.Lock() | ||
|
|
||
| r.batch = append(r.batch, span) | ||
| if len(r.batch) > r.maxBacklog { | ||
| dispose := len(r.batch) - r.maxBacklog | ||
| r.logger.Printf("backlog too long, disposing %d spans", dispose) | ||
| r.batch = r.batch[dispose:] | ||
| } | ||
| newBatchSize = len(r.batch) | ||
|
|
||
| r.batchMtx.Unlock() | ||
| return | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: invalid url
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newest url:https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/kafka