-
Notifications
You must be signed in to change notification settings - Fork 387
feat: support kafka metrics exporter #2387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
core/src/main/scala/kafka/log/stream/s3/telemetry/exporter/KafkaExportURI.java
Show resolved
Hide resolved
} | ||
|
||
// Check for backpressure | ||
if (isProducerOverloaded()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not a good practice to backpressure a producer by iterating over it's metrics, metrics are not intended for in-time validation but for monitoring. In fact PeriodicMetricReader itself implemented backpressure logics, it will only export next batch of metrics when the last export succeed
} | ||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, value); | ||
// Add callback tracking | ||
futures.add(producer.send(record, new LoggingCallback(metric))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's unneccessary to collect the futures and check if all futures are succeed in a thread pool, you can create a completable future for each send and compltes the future in callback, then use CompletableFuture.allOf to return the final result
|
||
for (int i = 0; i < pointCount; i += MAX_SAMPLES_PER_RECORD) { | ||
List<LongPointData> subPoints = getSubList(points, i); | ||
MetricData subMetric = buildLongGaugeMetric(metric, subPoints); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe implement in a more efficient way to avoid creating new objects for every batch, such as specifying the serialization range in serialize() method and only serialize the points within the range
@@ -978,6 +979,10 @@ project(':core') { | |||
implementation libs.opentelemetryExporterOTLP | |||
implementation libs.opentelemetryJmx | |||
implementation libs.awsSdkAuth | |||
implementation 'com.google.protobuf:protobuf-java:3.25.5' | |||
implementation 'com.squareup.wire:wire-schema:4.9.1' | |||
implementation 'com.squareup.wire:wire-runtime:4.9.1' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dependency on wire
and proto-google-common-protos
should not be necessary here.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)