Skip to content

Commit d2bb33f

Browse files
feat(ingest): new hex connector - part 2 (datahub-project#12985)
1 parent 7618af5 commit d2bb33f

18 files changed

+3295
-36
lines changed

Diff for: metadata-ingestion/docs/sources/hex/README.md

+9-1
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,12 @@ Currently, the [Hex API](https://learn.hex.tech/docs/api/api-reference) has some
2020

2121
2. **Metadata Access**: There is no direct method to retrieve metadata for Collections, Status, or Categories. This information is only available indirectly through references within Projects and Components.
2222

23-
Please keep these limitations in mind when working with the Hex connector.
23+
Please keep these limitations in mind when working with the Hex connector.
24+
25+
For the Dataset - Hex Project lineage, the connector relies on the
26+
[_Hex query metadata_](https://learn.hex.tech/docs/explore-data/cells/sql-cells/sql-cells-introduction#query-metadata) feature.
27+
Therefore, in order to extract lineage information, the required setup must include:
28+
29+
- A separated warehouse ingestor (_eg_ BigQuery, Snowflake, Redshift, ...) with `use_queries_v2` enabled in order to fetch Queries.
30+
This will ingest the queries into DataHub as `Query` entities and the ones triggered by Hex will include the corresponding _Hex query metadata_.
31+
- A DataHub server with version >= SaaS `0.3.10` or > OSS `1.0.0` so the `Query` entities are properly indexed by source (Hex in this case) and so fetched and processed by the Hex ingestor in order to emit the Dataset - Project lineage.
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
from datahub.metadata.urns import DataPlatformUrn
2+
13
HEX_PLATFORM_NAME = "hex"
4+
HEX_PLATFORM_URN = DataPlatformUrn(platform_name=HEX_PLATFORM_NAME)
25
HEX_API_BASE_URL_DEFAULT = "https://app.hex.tech/api/v1"
36
HEX_API_PAGE_SIZE_DEFAULT = 100
7+
8+
DATAHUB_API_PAGE_SIZE_DEFAULT = 100

Diff for: metadata-ingestion/src/datahub/ingestion/source/hex/hex.py

+150-22
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime, timedelta, timezone
13
from typing import Any, Dict, Iterable, List, Optional
24

3-
from pydantic import Field, SecretStr
5+
from pydantic import Field, SecretStr, root_validator
46
from typing_extensions import assert_never
57

68
from datahub.configuration.common import AllowDenyPattern
9+
from datahub.configuration.datetimes import parse_user_datetime
710
from datahub.configuration.source_common import (
811
EnvConfigMixin,
912
PlatformInstanceConfigMixin,
@@ -21,22 +24,28 @@
2124
from datahub.ingestion.api.workunit import MetadataWorkUnit
2225
from datahub.ingestion.source.hex.api import HexApi, HexApiReport
2326
from datahub.ingestion.source.hex.constants import (
27+
DATAHUB_API_PAGE_SIZE_DEFAULT,
2428
HEX_API_BASE_URL_DEFAULT,
2529
HEX_API_PAGE_SIZE_DEFAULT,
2630
HEX_PLATFORM_NAME,
2731
)
2832
from datahub.ingestion.source.hex.mapper import Mapper
2933
from datahub.ingestion.source.hex.model import Component, Project
34+
from datahub.ingestion.source.hex.query_fetcher import (
35+
HexQueryFetcher,
36+
HexQueryFetcherReport,
37+
)
3038
from datahub.ingestion.source.state.stale_entity_removal_handler import (
3139
StaleEntityRemovalHandler,
3240
StaleEntityRemovalSourceReport,
3341
StatefulStaleMetadataRemovalConfig,
3442
)
3543
from datahub.ingestion.source.state.stateful_ingestion_base import (
3644
StatefulIngestionConfigBase,
37-
StatefulIngestionReport,
3845
StatefulIngestionSourceBase,
3946
)
47+
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
48+
from datahub.sdk.main_client import DataHubClient
4049

4150

4251
class HexSourceConfig(
@@ -93,9 +102,73 @@ class HexSourceConfig(
93102
default=True,
94103
description="Set ownership identity from owner/creator email",
95104
)
105+
include_lineage: bool = Field(
106+
default=True,
107+
description='Include Hex lineage, being fetched from DataHub. See "Limitations" section in the docs for more details about the limitations of this feature.',
108+
)
109+
lineage_start_time: Optional[datetime] = Field(
110+
default=None,
111+
description="Earliest date of lineage to consider. Default: 1 day before lineage end time. You can specify absolute time like '2023-01-01' or relative time like '-7 days' or '-7d'.",
112+
)
113+
lineage_end_time: Optional[datetime] = Field(
114+
default=None,
115+
description="Latest date of lineage to consider. Default: Current time in UTC. You can specify absolute time like '2023-01-01' or relative time like '-1 day' or '-1d'.",
116+
)
117+
datahub_page_size: int = Field(
118+
default=DATAHUB_API_PAGE_SIZE_DEFAULT,
119+
description="Number of items to fetch per DataHub API call.",
120+
)
121+
122+
@root_validator(pre=True)
123+
def validate_lineage_times(cls, data: Dict[str, Any]) -> Dict[str, Any]:
124+
# lineage_end_time default = now
125+
if "lineage_end_time" not in data or data["lineage_end_time"] is None:
126+
data["lineage_end_time"] = datetime.now(tz=timezone.utc)
127+
# if string is given, parse it
128+
if isinstance(data["lineage_end_time"], str):
129+
data["lineage_end_time"] = parse_user_datetime(data["lineage_end_time"])
130+
# if no timezone is given, assume UTC
131+
if data["lineage_end_time"].tzinfo is None:
132+
data["lineage_end_time"] = data["lineage_end_time"].replace(
133+
tzinfo=timezone.utc
134+
)
135+
# at this point, we ensure there is a non null datetime with UTC timezone for lineage_end_time
136+
assert (
137+
data["lineage_end_time"]
138+
and isinstance(data["lineage_end_time"], datetime)
139+
and data["lineage_end_time"].tzinfo is not None
140+
and data["lineage_end_time"].tzinfo == timezone.utc
141+
)
142+
143+
# lineage_start_time default = lineage_end_time - 1 day
144+
if "lineage_start_time" not in data or data["lineage_start_time"] is None:
145+
data["lineage_start_time"] = data["lineage_end_time"] - timedelta(days=1)
146+
# if string is given, parse it
147+
if isinstance(data["lineage_start_time"], str):
148+
data["lineage_start_time"] = parse_user_datetime(data["lineage_start_time"])
149+
# if no timezone is given, assume UTC
150+
if data["lineage_start_time"].tzinfo is None:
151+
data["lineage_start_time"] = data["lineage_start_time"].replace(
152+
tzinfo=timezone.utc
153+
)
154+
# at this point, we ensure there is a non null datetime with UTC timezone for lineage_start_time
155+
assert (
156+
data["lineage_start_time"]
157+
and isinstance(data["lineage_start_time"], datetime)
158+
and data["lineage_start_time"].tzinfo is not None
159+
and data["lineage_start_time"].tzinfo == timezone.utc
160+
)
161+
162+
return data
96163

97164

98-
class HexReport(StaleEntityRemovalSourceReport, HexApiReport):
165+
@dataclass
166+
class HexReport(
167+
StaleEntityRemovalSourceReport,
168+
HexApiReport,
169+
IngestionStageReport,
170+
HexQueryFetcherReport,
171+
):
99172
pass
100173

101174

@@ -110,7 +183,7 @@ class HexSource(StatefulIngestionSourceBase):
110183
def __init__(self, config: HexSourceConfig, ctx: PipelineContext):
111184
super().__init__(config, ctx)
112185
self.source_config = config
113-
self.report = HexReport()
186+
self.report: HexReport = HexReport()
114187
self.platform = HEX_PLATFORM_NAME
115188
self.hex_api = HexApi(
116189
report=self.report,
@@ -129,6 +202,28 @@ def __init__(self, config: HexSourceConfig, ctx: PipelineContext):
129202
categories_as_tags=self.source_config.categories_as_tags,
130203
set_ownership_from_email=self.source_config.set_ownership_from_email,
131204
)
205+
self.project_registry: Dict[str, Project] = {}
206+
self.component_registry: Dict[str, Component] = {}
207+
208+
self.datahub_client: Optional[DataHubClient] = None
209+
self.query_fetcher: Optional[HexQueryFetcher] = None
210+
if self.source_config.include_lineage:
211+
graph = ctx.require_graph("Lineage")
212+
assert self.source_config.lineage_start_time and isinstance(
213+
self.source_config.lineage_start_time, datetime
214+
)
215+
assert self.source_config.lineage_end_time and isinstance(
216+
self.source_config.lineage_end_time, datetime
217+
)
218+
self.datahub_client = DataHubClient(graph=graph)
219+
self.query_fetcher = HexQueryFetcher(
220+
datahub_client=self.datahub_client,
221+
workspace_name=self.source_config.workspace_name,
222+
start_datetime=self.source_config.lineage_start_time,
223+
end_datetime=self.source_config.lineage_end_time,
224+
report=self.report,
225+
page_size=self.source_config.datahub_page_size,
226+
)
132227

133228
@classmethod
134229
def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "HexSource":
@@ -143,25 +238,58 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
143238
).workunit_processor,
144239
]
145240

146-
def get_report(self) -> StatefulIngestionReport:
241+
def get_report(self) -> HexReport:
147242
return self.report
148243

149244
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
150-
yield from self.mapper.map_workspace()
151-
152-
for project_or_component in self.hex_api.fetch_projects():
153-
if isinstance(project_or_component, Project):
154-
if self.source_config.project_title_pattern.allowed(
155-
project_or_component.title
156-
):
157-
yield from self.mapper.map_project(project=project_or_component)
158-
elif isinstance(project_or_component, Component):
159-
if (
160-
self.source_config.include_components
161-
and self.source_config.component_title_pattern.allowed(
245+
with self.report.new_stage("Fetch Hex assets from Hex API"):
246+
for project_or_component in self.hex_api.fetch_projects():
247+
if isinstance(project_or_component, Project):
248+
if self.source_config.project_title_pattern.allowed(
162249
project_or_component.title
163-
)
164-
):
165-
yield from self.mapper.map_component(component=project_or_component)
166-
else:
167-
assert_never(project_or_component)
250+
):
251+
self.project_registry[project_or_component.id] = (
252+
project_or_component
253+
)
254+
elif isinstance(project_or_component, Component):
255+
if (
256+
self.source_config.include_components
257+
and self.source_config.component_title_pattern.allowed(
258+
project_or_component.title
259+
)
260+
):
261+
self.component_registry[project_or_component.id] = (
262+
project_or_component
263+
)
264+
else:
265+
assert_never(project_or_component)
266+
267+
if self.source_config.include_lineage:
268+
assert self.datahub_client and self.query_fetcher
269+
270+
with self.report.new_stage(
271+
"Fetch Hex lineage from existing Queries in DataHub"
272+
):
273+
for query_metadata in self.query_fetcher.fetch():
274+
project = self.project_registry.get(query_metadata.hex_project_id)
275+
if project:
276+
project.upstream_datasets.extend(
277+
query_metadata.dataset_subjects
278+
)
279+
project.upstream_schema_fields.extend(
280+
query_metadata.schema_field_subjects
281+
)
282+
else:
283+
self.report.report_warning(
284+
title="Missing project for lineage",
285+
message="Lineage missed because missed project, likely due to filter patterns or deleted project.",
286+
context=str(query_metadata),
287+
)
288+
289+
with self.report.new_stage("Emit"):
290+
yield from self.mapper.map_workspace()
291+
292+
for project in self.project_registry.values():
293+
yield from self.mapper.map_project(project=project)
294+
for component in self.component_registry.values():
295+
yield from self.mapper.map_component(component=component)

Diff for: metadata-ingestion/src/datahub/ingestion/source/hex/mapper.py

+28-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from datetime import datetime
3-
from typing import Iterable, List, Optional, Tuple
3+
from typing import Iterable, List, Optional, Tuple, Union
44

55
from datahub._codegen.aspect import (
66
_Aspect, # TODO: is there a better import than this one?
@@ -46,14 +46,22 @@
4646
DashboardInfoClass,
4747
DashboardUsageStatisticsClass,
4848
DataPlatformInstanceClass,
49+
EdgeClass,
4950
GlobalTagsClass,
5051
OwnerClass,
5152
OwnershipClass,
5253
SubTypesClass,
5354
TagAssociationClass,
5455
TimeWindowSizeClass,
5556
)
56-
from datahub.metadata.urns import ContainerUrn, CorpUserUrn, DashboardUrn, Urn
57+
from datahub.metadata.urns import (
58+
ContainerUrn,
59+
CorpUserUrn,
60+
DashboardUrn,
61+
DatasetUrn,
62+
SchemaFieldUrn,
63+
Urn,
64+
)
5765

5866
logger = logging.getLogger(__name__)
5967

@@ -116,6 +124,8 @@ def map_project(self, project: Project) -> Iterable[MetadataWorkUnit]:
116124
),
117125
externalUrl=f"{self._base_url}/{self._workspace_name}/hex/{project.id}",
118126
customProperties=dict(id=project.id),
127+
datasetEdges=self._dataset_edges(project.upstream_datasets),
128+
# TODO: support schema field upstream, maybe InputFields?
119129
)
120130

121131
subtypes = SubTypesClass(
@@ -343,6 +353,22 @@ def _platform_instance_aspect(self) -> DataPlatformInstanceClass:
343353
else None,
344354
)
345355

356+
def _dataset_edges(
357+
self, upstream: List[Union[DatasetUrn, SchemaFieldUrn]]
358+
) -> Optional[List[EdgeClass]]:
359+
# TBC: is there support for CLL in Dashboards? for the moment, skip SchemaFieldUrns
360+
return (
361+
[
362+
EdgeClass(
363+
destinationUrn=upstream_urn.urn(),
364+
)
365+
for upstream_urn in upstream
366+
if isinstance(upstream_urn, DatasetUrn)
367+
]
368+
if upstream
369+
else None
370+
)
371+
346372
def _yield_mcps(
347373
self, entity_urn: Urn, aspects: List[Optional[_Aspect]]
348374
) -> Iterable[MetadataWorkUnit]:

Diff for: metadata-ingestion/src/datahub/ingestion/source/hex/model.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
from dataclasses import dataclass
1+
from dataclasses import dataclass, field
22
from datetime import datetime
3-
from typing import List, Optional
3+
from typing import List, Optional, Union
4+
5+
from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn
46

57

68
@dataclass
@@ -51,6 +53,12 @@ class Project:
5153
creator: Optional[Owner] = None
5254
owner: Optional[Owner] = None
5355
analytics: Optional[Analytics] = None
56+
upstream_datasets: List[Union[DatasetUrn, SchemaFieldUrn]] = field(
57+
default_factory=list
58+
)
59+
upstream_schema_fields: List[Union[DatasetUrn, SchemaFieldUrn]] = field(
60+
default_factory=list
61+
)
5462

5563

5664
@dataclass

0 commit comments

Comments
 (0)