- 
                Notifications
    
You must be signed in to change notification settings  - Fork 326
 
Introducing changes to enable forwarding of Polaris events to AWS Cloudwatch. #2962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Introducing changes to enable forwarding of Polaris events to AWS Cloudwatch. #2962
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for re-opening a PR @vchag , this is now looking much better.
| # polaris.event-listener.aws-cloudwatch.log-stream=polaris-cloudwatch-default-stream | ||
| # polaris.event-listener.aws-cloudwatch.region=us-east-1 | ||
| # polaris.event-listener.aws-cloudwatch.synchronous-mode=false | ||
| # polaris.event-listener.aws-cloudwatch.event-types= // the absence of this property would result if processing all Polaris event types. | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # polaris.event-listener.aws-cloudwatch.event-types= // the absence of this property would result if processing all Polaris event types. | |
| # polaris.event-listener.aws-cloudwatch.event-types= // the absence of this property would result in processing all Polaris event types. | 
| executorService = Executors.newSingleThreadExecutor(); | ||
| 
               | 
          ||
| // Build the test mapper and apply the same customizations Quarkus would | ||
| objectMapper = new ObjectMapper(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could be a final field.
| AwsCloudWatchConfiguration config, | ||
| Clock clock, | ||
| PolarisIcebergObjectMapperCustomizer customizer) { | ||
| PolarisIcebergObjectMapperCustomizer customizer, | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to pass the customizer, the customizer will be invoked for you by Quarkus. Just inject the object mapper.
| protected abstract void handle(PolarisEvent event); | ||
| 
               | 
          ||
| /** Optional filter (config-based). Default: handle all. */ | ||
| protected boolean shouldHandle(Object event) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| protected boolean shouldHandle(Object event) { | |
| protected boolean shouldHandle(PolarisEvent event) { | 
| this.listenToAllEvents = | ||
| config.eventTypes().isEmpty() | ||
| || config.eventTypes().map(Set::isEmpty).orElse(true) | ||
| || config.eventTypes().get().stream().anyMatch(e -> e == PolarisEvent.class); | ||
| this.allowedEventTypes = listenToAllEvents ? Set.of() : Set.copyOf(config.eventTypes().get()); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can simplify a bit:
    this.allowedEventTypes = config.eventTypes().orElse(Set.of());
    this.listenToAllEvents =
        allowedEventTypes.isEmpty()
            || allowedEventTypes.stream().anyMatch(c -> c == PolarisEvent.class);| if (this.listenToAllEvents) { | ||
| return true; | ||
| } | ||
| Class<? extends PolarisEvent> actualType = polarisEvent.getClass(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is probably a bit inefficient under a high load. Switching to event type IDs should help because the lookup would be faster when using an EnumSet.
| } | ||
| 
               | 
          ||
| @PostConstruct | ||
| void verifyMapper() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary.
| import org.apache.polaris.service.events.PrincipalsServiceEvents; | ||
| 
               | 
          ||
| /** | ||
| * Base class for event listeners that with to generically forward all {@link PolarisEvent | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * Base class for event listeners that with to generically forward all {@link PolarisEvent | |
| * Base class for event listeners that wish to generically forward all {@link PolarisEvent | 
These changes were created to address concerns raised in Issues #2630
What changes were proposed in this pull request?
Streamlining Event-to-JSON Serialization:
The previous architecture involved a redundant, two-step conversion process: events were transformed into intermediate Maps by an abstract class, and only then was the resulting map serialized into JSON (as seen in AwsCloudWatchEventListener).
Since Jackson JSON is our chosen serialization format, we've removed this unnecessary intermediate mapping step. We introduced Jackson Mixins to provide native JSON serialization support directly on the event objects. This refactoring simplifies the serialization pipeline from:
Event -> Map -> JSON
to
Event -> JSON
Introduces
AllEventsForwardingListener, a generic event listener designed to automatically receive and forward all Polaris events without requiring explicit event-type configuration.Introduces
polaris.event-listener.aws-cloudwatch.event-typesproperty: This property introduces a configurable event filter for the AWS CloudWatch event listener.It defines which Polaris event types should be serialized and forwarded to CloudWatch Logs.
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?
CHANGELOG.md