-
Notifications
You must be signed in to change notification settings - Fork 749
[GOBBLIN-2239] Introduce pushMessagesSync for sync GTE emission #4156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces synchronous GTE (Gobblin Tracking Event) emission to prevent event loss when containers are terminated early by Yarn NodeManager with SIGKILL. The implementation adds a new pushMessagesSync method to the Pusher interface with backward-compatible default implementation, and enables synchronous event reporting through the existing reporter chain.
Key changes:
- Added
pushMessagesSync()method toPusherinterface andreportEventQueueSynchronously()toEventReporterclass with default implementations for backward compatibility - Implemented synchronous event reporting in
KafkaEventKeyValueReporterand addedgetScheduledReporter()method toGobblinMetricsfor retrieving specific reporter instances - Modified
SubmitGTEActivityImplto trigger synchronous GTE flushing after event submission using container metrics configuration
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java | Adds pushMessagesSync() method with default implementation delegating to async method |
| gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java | Adds reportSynchronously() and reportEventQueueSynchronously() methods for synchronous event reporting |
| gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java | Adds getScheduledReporter() method to retrieve specific reporter instances by class |
| gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java | Refactors event queue processing into shared helper and implements synchronous reporting variant |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java | Defines configuration keys for container metrics application name and task runner ID |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java | Persists container metrics configuration to enable later retrieval |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java | Implements synchronous GTE flushing by retrieving container metrics and triggering synchronous reporter |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
Outdated
Show resolved
Hide resolved
...bs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
Outdated
Show resolved
Hide resolved
...-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
Show resolved
Hide resolved
...poral/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
Outdated
Show resolved
Hide resolved
...poral/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
Outdated
Show resolved
Hide resolved
...poral/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
Show resolved
Hide resolved
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
In Gobblin-on-Temporal, GTE can be emitted from any container in case of more than 1 containers if event queue has very less GTE those are flushed while closing only and sometimes due to early SIGKILL sent from Yarn NM GTE couldn't be flushed properly.
Have introduced
pushMessagesSyncinPusher.javainterface with default implementation to call async only for backward compatibilityTests
NA
Commits