Skip to content

Commit 67241c2

Browse files
authored
Merge branch 'datahub-project:master' into master
2 parents a326b80 + b82ec1c commit 67241c2

File tree

8 files changed

+47
-61
lines changed

8 files changed

+47
-61
lines changed

metadata-ingestion/src/datahub/emitter/mcp.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import dataclasses
22
import json
3-
from typing import TYPE_CHECKING, List, Optional, Sequence, Tuple, Union
3+
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple, Union
44

55
from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE
66
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
@@ -69,6 +69,7 @@ class MetadataChangeProposalWrapper:
6969
aspectName: Union[None, str] = None
7070
aspect: Union[None, _Aspect] = None
7171
systemMetadata: Union[None, SystemMetadataClass] = None
72+
headers: Union[None, Dict[str, str]] = None
7273

7374
def __post_init__(self) -> None:
7475
if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET:
@@ -112,6 +113,7 @@ def _make_mcp_without_aspects(self) -> MetadataChangeProposalClass:
112113
auditHeader=self.auditHeader,
113114
aspectName=self.aspectName,
114115
systemMetadata=self.systemMetadata,
116+
headers=self.headers,
115117
)
116118

117119
def make_mcp(self) -> MetadataChangeProposalClass:
@@ -211,6 +213,7 @@ def try_from_mcpc(
211213
aspectName=mcpc.aspectName,
212214
aspect=aspect,
213215
systemMetadata=mcpc.systemMetadata,
216+
headers=mcpc.headers,
214217
)
215218
else:
216219
return None
@@ -228,6 +231,7 @@ def try_from_mcl(
228231
changeType=mcl.changeType,
229232
auditHeader=mcl.auditHeader,
230233
systemMetadata=mcl.systemMetadata,
234+
headers=mcl.headers,
231235
)
232236
return cls.try_from_mcpc(mcpc) or mcpc
233237

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py

+1
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ class SnowflakeV2Config(
301301
default=AllowDenyPattern.allow_all(),
302302
description=(
303303
"List of regex patterns for structured properties to include in ingestion."
304+
" Applied to tags with form `<database>.<schema>.<tag_name>`."
304305
" Only used if `extract_tags` and `extract_tags_as_structured_properties` are enabled."
305306
),
306307
)

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
2424
StructuredPropertyDefinition,
2525
)
26+
from datahub.metadata.schema_classes import ChangeTypeClass
2627
from datahub.metadata.urns import (
2728
ContainerUrn,
2829
DatasetUrn,
@@ -81,7 +82,7 @@ def _get_tags_on_object_without_propagation(
8182
def create_structured_property_templates(self) -> Iterable[MetadataWorkUnit]:
8283
for tag in self.data_dictionary.get_all_tags():
8384
if not self.config.structured_property_pattern.allowed(
84-
tag.tag_identifier()
85+
tag._id_prefix_as_str()
8586
):
8687
continue
8788
if self.config.extract_tags_as_structured_properties:
@@ -111,6 +112,8 @@ def gen_tag_as_structured_property_workunits(
111112
yield MetadataChangeProposalWrapper(
112113
entityUrn=urn,
113114
aspect=aspect,
115+
changeType=ChangeTypeClass.CREATE,
116+
headers={"If-None-Match": "*"},
114117
).as_workunit()
115118

116119
def _get_tags_on_object_with_propagation(

metadata-ingestion/src/datahub/testing/mcp_diff.py

+15-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
import re
44
from collections import defaultdict
5-
from typing import Any, Dict, List, Sequence, Set, Tuple, Union
5+
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
66

77
import deepdiff.serialization
88
import yaml
@@ -34,6 +34,7 @@ class AspectForDiff:
3434
aspect_name: str
3535
aspect: Dict[str, Any] = dataclasses.field(hash=False)
3636
delta_info: "DeltaInfo" = dataclasses.field(hash=False, repr=False)
37+
headers: Optional[Dict[str, str]] = dataclasses.field(default=None, hash=False)
3738

3839
@classmethod
3940
def create_from_mcp(cls, idx: int, obj: Dict[str, Any]) -> "AspectForDiff":
@@ -44,6 +45,7 @@ def create_from_mcp(cls, idx: int, obj: Dict[str, Any]) -> "AspectForDiff":
4445
aspect_name=obj["aspectName"],
4546
aspect=aspect.get("json", aspect),
4647
delta_info=DeltaInfo(idx=idx, original=obj),
48+
headers=obj.get("headers"),
4749
)
4850

4951
def __repr__(self):
@@ -240,9 +242,12 @@ def pretty(self, verbose: bool = False) -> str:
240242
s.append(serialize_aspect(ga.aspect))
241243
for (i, old, new), diffs in aspect_diffs.aspects_changed.items():
242244
s.append(self.report_aspect(old, i, "changed") + ":")
245+
246+
print_aspects = False
243247
for diff_level in diffs:
244248
s.append(self.report_diff_level(diff_level, i))
245-
if verbose:
249+
print_aspects |= self.is_diff_level_on_aspect(diff_level)
250+
if verbose and print_aspects:
246251
s.append(f"Old aspect:\n{serialize_aspect(old.aspect)}")
247252
s.append(f"New aspect:\n{serialize_aspect(new.aspect)}")
248253

@@ -271,6 +276,14 @@ def report_diff_level(diff: DiffLevel, idx: int) -> str:
271276
f"root[{idx}].", ""
272277
)
273278

279+
@staticmethod
280+
def is_diff_level_on_aspect(diff: DiffLevel) -> bool:
281+
skip_print_fields = ["changeType", "headers"]
282+
try:
283+
return diff.path(output_format="list")[1] not in skip_print_fields
284+
except IndexError:
285+
return True
286+
274287

275288
def serialize_aspect(aspect: Union[AspectForDiff, Dict[str, Any]]) -> str:
276289
if isinstance(aspect, AspectForDiff): # Unpack aspect

metadata-ingestion/tests/integration/snowflake/snowflake_structured_properties_golden.json

+16-57
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@
123123
{
124124
"entityType": "structuredProperty",
125125
"entityUrn": "urn:li:structuredProperty:snowflake.other_db.other_schema.my_other_tag",
126-
"changeType": "UPSERT",
126+
"changeType": "CREATE",
127127
"aspectName": "propertyDefinition",
128128
"aspect": {
129129
"json": {
@@ -147,6 +147,9 @@
147147
"lastObserved": 1615443388097,
148148
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
149149
"lastRunId": "no-run-id-provided"
150+
},
151+
"headers": {
152+
"If-None-Match": "*"
150153
}
151154
},
152155
{
@@ -2978,7 +2981,7 @@
29782981
{
29792982
"entityType": "structuredProperty",
29802983
"entityUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.security",
2981-
"changeType": "UPSERT",
2984+
"changeType": "CREATE",
29822985
"aspectName": "propertyDefinition",
29832986
"aspect": {
29842987
"json": {
@@ -3002,6 +3005,9 @@
30023005
"lastObserved": 1615443388097,
30033006
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
30043007
"lastRunId": "no-run-id-provided"
3008+
},
3009+
"headers": {
3010+
"If-None-Match": "*"
30053011
}
30063012
},
30073013
{
@@ -3318,7 +3324,7 @@
33183324
{
33193325
"entityType": "structuredProperty",
33203326
"entityUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.my_tag_0",
3321-
"changeType": "UPSERT",
3327+
"changeType": "CREATE",
33223328
"aspectName": "propertyDefinition",
33233329
"aspect": {
33243330
"json": {
@@ -3342,12 +3348,15 @@
33423348
"lastObserved": 1615443388097,
33433349
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
33443350
"lastRunId": "no-run-id-provided"
3351+
},
3352+
"headers": {
3353+
"If-None-Match": "*"
33453354
}
33463355
},
33473356
{
33483357
"entityType": "structuredProperty",
33493358
"entityUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.my_tag_1",
3350-
"changeType": "UPSERT",
3359+
"changeType": "CREATE",
33513360
"aspectName": "propertyDefinition",
33523361
"aspect": {
33533362
"json": {
@@ -3371,35 +3380,9 @@
33713380
"lastObserved": 1615443388097,
33723381
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
33733382
"lastRunId": "no-run-id-provided"
3374-
}
3375-
},
3376-
{
3377-
"entityType": "structuredProperty",
3378-
"entityUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.my_tag_2",
3379-
"changeType": "UPSERT",
3380-
"aspectName": "propertyDefinition",
3381-
"aspect": {
3382-
"json": {
3383-
"qualifiedName": "snowflake.test_db.test_schema.my_tag_2",
3384-
"displayName": "my_tag_2",
3385-
"valueType": "urn:li:dataType:datahub.string",
3386-
"cardinality": "SINGLE",
3387-
"entityTypes": [
3388-
"urn:li:entityType:datahub.container",
3389-
"urn:li:entityType:datahub.dataset",
3390-
"urn:li:entityType:datahub.schemaField"
3391-
],
3392-
"immutable": false,
3393-
"lastModified": {
3394-
"time": 1615443388097,
3395-
"actor": "urn:li:corpuser:datahub"
3396-
}
3397-
}
33983383
},
3399-
"systemMetadata": {
3400-
"lastObserved": 1615443388097,
3401-
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
3402-
"lastRunId": "no-run-id-provided"
3384+
"headers": {
3385+
"If-None-Match": "*"
34033386
}
34043387
},
34053388
{
@@ -3666,14 +3649,6 @@
36663649
"string": "my_value_1"
36673650
}
36683651
]
3669-
},
3670-
{
3671-
"propertyUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.my_tag_2",
3672-
"values": [
3673-
{
3674-
"string": "my_value_2"
3675-
}
3676-
]
36773652
}
36783653
]
36793654
}
@@ -5007,22 +4982,6 @@
50074982
"lastRunId": "no-run-id-provided"
50084983
}
50094984
},
5010-
{
5011-
"entityType": "structuredProperty",
5012-
"entityUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.my_tag_2",
5013-
"changeType": "UPSERT",
5014-
"aspectName": "status",
5015-
"aspect": {
5016-
"json": {
5017-
"removed": false
5018-
}
5019-
},
5020-
"systemMetadata": {
5021-
"lastObserved": 1615443388097,
5022-
"runId": "snowflake-2025_01_07-13_38_56-3fo398",
5023-
"lastRunId": "no-run-id-provided"
5024-
}
5025-
},
50264985
{
50274986
"entityType": "structuredProperty",
50284987
"entityUrn": "urn:li:structuredProperty:snowflake.test_db.test_schema.security",
@@ -5055,4 +5014,4 @@
50555014
"lastRunId": "no-run-id-provided"
50565015
}
50575016
}
5058-
]
5017+
]

metadata-ingestion/tests/integration/snowflake/test_snowflake.py

+3
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ def test_snowflake_tags_as_structured_properties(
209209
type="snowflake",
210210
config=SnowflakeV2Config(
211211
extract_tags_as_structured_properties=True,
212+
structured_property_pattern=AllowDenyPattern(
213+
deny=["test_db.test_schema.my_tag_2"]
214+
),
212215
extract_tags=TagOption.without_lineage,
213216
account_id="ABC12345.ap-south-1.aws",
214217
username="TST_USR",

metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java

+1
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public void appendRunId(
121121

122122
// Create an upsert document that will be used if the document doesn't exist
123123
Map<String, Object> upsert = new HashMap<>();
124+
upsert.put("urn", urn.toString());
124125
upsert.put("runId", Collections.singletonList(runId));
125126

126127
esWriteDAO.applyScriptUpdate(

metadata-io/src/test/java/com/linkedin/metadata/search/ElasticSearchServiceTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public void testAppendRunId_ValidRunId() {
8989
// Verify upsert document
9090
Map<String, Object> capturedUpsert = upsertCaptor.getValue();
9191
assertEquals(capturedUpsert.get("runId"), Collections.singletonList(runId));
92+
assertEquals(capturedUpsert.get("urn"), TEST_URN.toString());
9293
}
9394

9495
@Test
@@ -121,6 +122,7 @@ public void testAppendRunId_NullRunId() {
121122

122123
Map<String, Object> capturedUpsert = upsertCaptor.getValue();
123124
assertEquals(capturedUpsert.get("runId"), Collections.singletonList(null));
125+
assertEquals(capturedUpsert.get("urn"), TEST_URN.toString());
124126
}
125127

126128
@Test(expectedExceptions = NullPointerException.class)

0 commit comments

Comments
 (0)