|
| 1 | +import json |
| 2 | +import logging |
| 3 | +import time |
| 4 | +from dataclasses import dataclass |
| 5 | +from typing import Iterable, List, Optional |
| 6 | + |
| 7 | +from datahub.configuration import ConfigModel |
| 8 | +from datahub.emitter.serialization_helper import post_json_transform |
| 9 | + |
| 10 | +# DataHub imports. |
| 11 | +from datahub.metadata.schema_classes import GenericPayloadClass |
| 12 | + |
| 13 | +from datahub_actions.event.event_envelope import EventEnvelope |
| 14 | +from datahub_actions.event.event_registry import ( |
| 15 | + ENTITY_CHANGE_EVENT_V1_TYPE, |
| 16 | + EntityChangeEvent, |
| 17 | +) |
| 18 | + |
| 19 | +# May or may not need these. |
| 20 | +from datahub_actions.pipeline.pipeline_context import PipelineContext |
| 21 | +from datahub_actions.plugin.source.acryl.constants import ( |
| 22 | + ENTITY_CHANGE_EVENT_NAME, |
| 23 | + PLATFORM_EVENT_TOPIC_NAME, |
| 24 | +) |
| 25 | +from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import ( |
| 26 | + AckManager, |
| 27 | +) |
| 28 | +from datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer import ( |
| 29 | + DataHubEventsConsumer, |
| 30 | + ExternalEvent, |
| 31 | +) |
| 32 | +from datahub_actions.source.event_source import EventSource |
| 33 | + |
| 34 | +logging.basicConfig(level=logging.INFO) |
| 35 | +logger = logging.getLogger(__name__) |
| 36 | + |
| 37 | + |
| 38 | +# Converts a DataHub Events Message to an EntityChangeEvent. |
| 39 | +def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent: |
| 40 | + try: |
| 41 | + return EntityChangeEvent.from_json(payload.get("value")) |
| 42 | + except Exception as e: |
| 43 | + raise ValueError("Failed to parse into EntityChangeEvent") from e |
| 44 | + |
| 45 | + |
| 46 | +class DataHubEventsSourceConfig(ConfigModel): |
| 47 | + topic: str = PLATFORM_EVENT_TOPIC_NAME |
| 48 | + consumer_id: Optional[str] # Used to store offset for the consumer. |
| 49 | + lookback_days: Optional[int] = None |
| 50 | + reset_offsets: Optional[bool] = False |
| 51 | + |
| 52 | + # Time and Exit Conditions. |
| 53 | + kill_after_idle_timeout: bool = False |
| 54 | + idle_timeout_duration_seconds: int = 30 |
| 55 | + event_processing_time_max_duration_seconds: int = 60 |
| 56 | + |
| 57 | + |
| 58 | +# This is the custom DataHub-based Event Source. |
| 59 | +@dataclass |
| 60 | +class DataHubEventSource(EventSource): |
| 61 | + running = False |
| 62 | + source_config: DataHubEventsSourceConfig |
| 63 | + ctx: PipelineContext |
| 64 | + |
| 65 | + @staticmethod |
| 66 | + def _get_pipeline_urn(pipeline_name: str) -> str: |
| 67 | + if pipeline_name.startswith("urn:li:dataHubAction:"): |
| 68 | + return pipeline_name |
| 69 | + else: |
| 70 | + return f"urn:li:dataHubAction:{pipeline_name}" |
| 71 | + |
| 72 | + def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext): |
| 73 | + self.ctx = ctx |
| 74 | + self.source_config = config |
| 75 | + self.consumer_id = DataHubEventSource._get_pipeline_urn(self.ctx.pipeline_name) |
| 76 | + |
| 77 | + # Ensure a Graph Instance was provided. |
| 78 | + assert self.ctx.graph is not None |
| 79 | + |
| 80 | + self.datahub_events_consumer: DataHubEventsConsumer = DataHubEventsConsumer( |
| 81 | + # TODO: This PipelineContext provides an Acryl Graph Instance |
| 82 | + graph=self.ctx.graph.graph, |
| 83 | + consumer_id=self.consumer_id, |
| 84 | + lookback_days=self.source_config.lookback_days, |
| 85 | + reset_offsets=self.source_config.reset_offsets, |
| 86 | + ) |
| 87 | + self.ack_manager = AckManager() |
| 88 | + self.safe_to_ack_offset: Optional[str] = None |
| 89 | + |
| 90 | + @classmethod |
| 91 | + def create(cls, config_dict: dict, ctx: PipelineContext) -> "EventSource": |
| 92 | + config = DataHubEventsSourceConfig.parse_obj(config_dict) |
| 93 | + return cls(config, ctx) |
| 94 | + |
| 95 | + def events(self) -> Iterable[EventEnvelope]: |
| 96 | + logger.info("Starting DataHub Cloud events source...") |
| 97 | + logger.info(f"Subscribing to the following topic: {self.source_config.topic}") |
| 98 | + self.running = True |
| 99 | + yield from self._poll_and_process_events() |
| 100 | + |
| 101 | + def _poll_and_process_events(self) -> Iterable[EventEnvelope]: |
| 102 | + """Poll and process events in the main loop.""" |
| 103 | + last_idle_response_timestamp = 0 |
| 104 | + while self.running: |
| 105 | + try: |
| 106 | + sleeps_to_go = ( |
| 107 | + self.source_config.event_processing_time_max_duration_seconds |
| 108 | + ) |
| 109 | + |
| 110 | + while self.ack_manager.outstanding_acks(): |
| 111 | + time.sleep(1) |
| 112 | + sleeps_to_go -= 1 |
| 113 | + logger.debug(f"Sleeps to go: {sleeps_to_go}") |
| 114 | + |
| 115 | + if sleeps_to_go == 0: |
| 116 | + self.running = False |
| 117 | + raise Exception( |
| 118 | + f"Failed to process all events successfully after specified time {self.source_config.event_processing_time_max_duration_seconds}! If more time is required, please increase the timeout using this config. {self.ack_manager.acks.values()}", |
| 119 | + ) |
| 120 | + logger.debug( |
| 121 | + f"Successfully processed events up to offset id {self.safe_to_ack_offset}" |
| 122 | + ) |
| 123 | + self.safe_to_ack_offset = self.datahub_events_consumer.offset_id |
| 124 | + logger.debug(f"Safe to ack offset: {self.safe_to_ack_offset}") |
| 125 | + |
| 126 | + events_response = self.datahub_events_consumer.poll_events( |
| 127 | + topic=self.source_config.topic, poll_timeout_seconds=2 |
| 128 | + ) |
| 129 | + |
| 130 | + # Handle Idle Timeout |
| 131 | + num_events = len(events_response.events) |
| 132 | + |
| 133 | + if num_events == 0: |
| 134 | + if last_idle_response_timestamp == 0: |
| 135 | + last_idle_response_timestamp = ( |
| 136 | + self._get_current_timestamp_seconds() |
| 137 | + ) |
| 138 | + if self._should_idle_timeout( |
| 139 | + num_events, last_idle_response_timestamp |
| 140 | + ): |
| 141 | + logger.info("Exiting main loop due to idle timeout") |
| 142 | + return |
| 143 | + else: |
| 144 | + self.ack_manager.new_batch() |
| 145 | + last_idle_response_timestamp = 0 # Reset the idle timeout |
| 146 | + |
| 147 | + event_envelopes: List[EventEnvelope] = [] |
| 148 | + for msg in events_response.events: |
| 149 | + for event_envelope in self.handle_pe(msg): |
| 150 | + event_envelope.meta = self.ack_manager.get_meta(event_envelope) |
| 151 | + event_envelopes.append(event_envelope) |
| 152 | + |
| 153 | + yield from event_envelopes |
| 154 | + |
| 155 | + except Exception as e: |
| 156 | + logger.exception(f"DataHub Events consumer error: {e}") |
| 157 | + self.running = False |
| 158 | + |
| 159 | + logger.info("DataHub Events consumer exiting main loop") |
| 160 | + |
| 161 | + @staticmethod |
| 162 | + def handle_pe(msg: ExternalEvent) -> Iterable[EventEnvelope]: |
| 163 | + value: dict = json.loads(msg.value) |
| 164 | + payload: GenericPayloadClass = GenericPayloadClass.from_obj( |
| 165 | + post_json_transform(value["payload"]) |
| 166 | + ) |
| 167 | + if ENTITY_CHANGE_EVENT_NAME == value["name"]: |
| 168 | + event = build_entity_change_event(payload) |
| 169 | + yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, event, {}) |
| 170 | + |
| 171 | + def close(self) -> None: |
| 172 | + if self.datahub_events_consumer: |
| 173 | + self.running = False |
| 174 | + if self.safe_to_ack_offset: |
| 175 | + self.datahub_events_consumer.commit_offsets( |
| 176 | + offset_id=self.safe_to_ack_offset |
| 177 | + ) |
| 178 | + self.datahub_events_consumer.close() |
| 179 | + |
| 180 | + def ack(self, event: EventEnvelope, processed: bool = True) -> None: |
| 181 | + self.ack_manager.ack(event.meta, processed=processed) |
| 182 | + logger.debug(f"Actions acked event {event} as processed {processed}") |
| 183 | + |
| 184 | + def _should_idle_timeout( |
| 185 | + self, num_events: int, last_idle_response_timestamp: int |
| 186 | + ) -> bool: |
| 187 | + """Handle idle timeout logic and decide if the loop should exit.""" |
| 188 | + if num_events > 0: |
| 189 | + return False # Continue processing |
| 190 | + |
| 191 | + current_timestamp_seconds = self._get_current_timestamp_seconds() |
| 192 | + |
| 193 | + if ( |
| 194 | + self.source_config.kill_after_idle_timeout |
| 195 | + and current_timestamp_seconds - last_idle_response_timestamp |
| 196 | + > self.source_config.idle_timeout_duration_seconds |
| 197 | + ): |
| 198 | + logger.info( |
| 199 | + f"Shutting down due to idle timeout of {self.source_config.idle_timeout_duration_seconds} seconds" |
| 200 | + ) |
| 201 | + self.running = False |
| 202 | + return True # Signal that we should exit |
| 203 | + return False # Continue processing |
| 204 | + |
| 205 | + def _get_current_timestamp_seconds(self) -> int: |
| 206 | + return int(time.time()) |
0 commit comments