Skip to content

Commit d4cad98

Browse files
authored
Merge branch 'datahub-project:master' into master
2 parents 8789466 + 68b4fcb commit d4cad98

File tree

65 files changed

+7205
-586
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+7205
-586
lines changed

.github/workflows/airflow-plugin.yml

+2-5
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,8 @@ jobs:
3434
include:
3535
# Note: this should be kept in sync with tox.ini.
3636
- python-version: "3.8"
37-
extra_pip_requirements: "apache-airflow~=2.3.4"
38-
extra_pip_extras: test-airflow23
39-
- python-version: "3.10"
40-
extra_pip_requirements: "apache-airflow~=2.4.3"
41-
extra_pip_extras: test-airflow24
37+
extra_pip_extras: test-airflow25
38+
# extra_pip_requirements: "apache-airflow~=2.5.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.5.3/constraints-3.8.txt"
4239
- python-version: "3.10"
4340
extra_pip_requirements: "apache-airflow~=2.6.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt"
4441
- python-version: "3.10"

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BuildIndicesConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
import org.springframework.context.annotation.Bean;
1313
import org.springframework.context.annotation.Conditional;
1414
import org.springframework.context.annotation.Configuration;
15+
import org.springframework.core.annotation.Order;
1516

1617
@Configuration
1718
@Conditional(SystemUpdateCondition.BlockingSystemUpdateCondition.class)
1819
public class BuildIndicesConfig {
20+
@Order(1)
1921
@Bean(name = "buildIndices")
2022
public BlockingSystemUpgrade buildIndices(
2123
final SystemMetadataService systemMetadataService,

datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeBlockingTest.java

+10
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
import static org.testng.Assert.assertTrue;
55
import static org.testng.AssertJUnit.assertNotNull;
66

7+
import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
78
import com.linkedin.datahub.upgrade.system.SystemUpdateBlocking;
89
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep;
10+
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
911
import java.util.List;
1012
import java.util.stream.Collectors;
1113
import javax.inject.Named;
@@ -25,6 +27,14 @@ public class DatahubUpgradeBlockingTest extends AbstractTestNGSpringContextTests
2527
@Named("systemUpdateBlocking")
2628
private SystemUpdateBlocking systemUpdateBlocking;
2729

30+
@Autowired private List<BlockingSystemUpgrade> blockingSystemUpgrades;
31+
32+
@Test
33+
public void testBuildIndicesOrder() {
34+
assertNotNull(blockingSystemUpgrades);
35+
assertTrue(blockingSystemUpgrades.get(0) instanceof BuildIndices);
36+
}
37+
2838
@Test
2939
public void testNBlockingBootstrapMCP() {
3040
assertNotNull(systemUpdateBlocking);

datahub-web-react/src/app/entityV2/shared/tabs/Incident/__tests__/utils.test.tsx

+120
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { format } from 'date-fns';
22
import { SortingState } from '@src/alchemy-components/components/Table/types';
3+
import { EntityType } from '@src/types.generated';
34
import {
45
getFilteredTransformedIncidentData,
56
getLinkedAssetsCount,
@@ -10,6 +11,7 @@ import {
1011
validateForm,
1112
getSortedIncidents,
1213
getExistingIncidents,
14+
useSiblingOptionsForIncidentBuilder,
1315
} from '../utils';
1416
import { IncidentListFilter } from '../types';
1517

@@ -119,4 +121,122 @@ describe('Utility Functions', () => {
119121
};
120122
expect(getExistingIncidents(currData)).toEqual([{ id: 1 }, { id: 2 }]);
121123
});
124+
125+
test('should return main entity data in options', () => {
126+
const mockEntityData = {
127+
urn: 'urn:li:dataset:(urn:li:dataPlatform:bigquery,my_table,PROD)',
128+
platform: {
129+
properties: {
130+
displayName: 'BigQuery',
131+
},
132+
name: 'bigquery',
133+
urn: 'urn:li:dataPlatform:bigquery',
134+
},
135+
dataPlatformInstance: {
136+
platform: {
137+
name: 'BigQueryInstance',
138+
},
139+
},
140+
siblingsSearch: {
141+
searchResults: [
142+
{
143+
entity: {
144+
urn: 'urn:li:dataset:(urn:li:dataPlatform:snowflake,my_table,PROD)',
145+
platform: {
146+
properties: {
147+
displayName: 'Snowflake',
148+
},
149+
name: 'snowflake',
150+
urn: 'urn:li:dataPlatform:snowflake',
151+
},
152+
dataPlatformInstance: {
153+
platform: {
154+
name: 'SnowflakeInstance',
155+
},
156+
},
157+
},
158+
},
159+
],
160+
},
161+
} as any;
162+
const result = useSiblingOptionsForIncidentBuilder(
163+
mockEntityData,
164+
'urn:li:dataset:main',
165+
'DATASET' as EntityType,
166+
);
167+
expect(result[0]).toEqual({
168+
title: 'BigQuery',
169+
urn: 'urn:li:dataset:main',
170+
platform: mockEntityData.platform,
171+
entityType: 'DATASET',
172+
});
173+
});
174+
175+
test('should include siblings data in options', () => {
176+
const mockEntityData = {
177+
urn: 'urn:li:dataset:(urn:li:dataPlatform:bigquery,my_table,PROD)',
178+
platform: {
179+
properties: { displayName: 'BigQuery' },
180+
name: 'bigquery',
181+
urn: 'urn:li:dataPlatform:bigquery',
182+
},
183+
dataPlatformInstance: {
184+
platform: { name: 'BigQueryInstance' },
185+
},
186+
siblingsSearch: {
187+
searchResults: [
188+
{
189+
entity: {
190+
urn: 'urn:li:dataset:(urn:li:dataPlatform:snowflake,my_table1,PROD)',
191+
platform: {
192+
properties: { displayName: 'Snowflake' },
193+
name: 'snowflake',
194+
urn: 'urn:li:dataPlatform:snowflake',
195+
},
196+
dataPlatformInstance: { platform: { name: 'SnowflakeInstance' } },
197+
type: 'DATASET',
198+
},
199+
},
200+
{
201+
entity: {
202+
urn: 'urn:li:dataset:(urn:li:dataPlatform:redshift,my_table2,PROD)',
203+
platform: {
204+
properties: { displayName: 'Redshift' },
205+
name: 'redshift',
206+
urn: 'urn:li:dataPlatform:redshift',
207+
},
208+
dataPlatformInstance: { platform: { name: 'RedshiftInstance' } },
209+
type: 'DATASET',
210+
},
211+
},
212+
],
213+
},
214+
} as any;
215+
216+
const result = useSiblingOptionsForIncidentBuilder(mockEntityData, mockEntityData.urn, 'DATASET' as EntityType);
217+
218+
// Expect 1 main entity + 2 siblings
219+
expect(result.length).toBe(3);
220+
221+
expect(result[0]).toEqual({
222+
urn: mockEntityData.urn,
223+
title: 'BigQuery',
224+
platform: mockEntityData.platform,
225+
entityType: 'DATASET',
226+
});
227+
228+
expect(result[1]).toEqual({
229+
urn: 'urn:li:dataset:(urn:li:dataPlatform:snowflake,my_table1,PROD)',
230+
title: 'Snowflake',
231+
platform: mockEntityData.siblingsSearch.searchResults[0].entity.platform,
232+
entityType: 'DATASET',
233+
});
234+
235+
expect(result[2]).toEqual({
236+
urn: 'urn:li:dataset:(urn:li:dataPlatform:redshift,my_table2,PROD)',
237+
title: 'Redshift',
238+
platform: mockEntityData.siblingsSearch.searchResults[1].entity.platform,
239+
entityType: 'DATASET',
240+
});
241+
});
122242
});

datahub-web-react/src/app/lineageV2/useSearchAcrossLineage.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ export default function useSearchAcrossLineage(
8585
lineageFlags: {
8686
startTimeMillis,
8787
endTimeMillis,
88-
entitiesExploredPerHopLimit: PER_HOP_LIMIT,
88+
entitiesExploredPerHopLimit: maxDepth ? PER_HOP_LIMIT : undefined,
8989
ignoreAsHops: DEFAULT_IGNORE_AS_HOPS,
9090
},
9191
searchFlags: {

docs/how/updating-datahub.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
2020

2121
### Breaking Changes
2222

23+
- #13004: The `acryl-datahub-airflow-plugin` dropped support for Airflow 2.3 and 2.4.
24+
2325
### Potential Downtime
2426

2527
### Deprecations
@@ -42,7 +44,6 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
4244

4345
- #12797: Previously endpoints when used in ASYNC mode would not validate URNs, entity & aspect names immediately. Starting with this release, even in ASYNC mode, these requests will be returned with http code 400.
4446

45-
4647
### Known Issues
4748

4849
- #12601: Jetty 12 introduces a stricter handling of url encoding. We are currently applying a workaround to prevent a regression, while technically breaking the official specifications.
@@ -84,7 +85,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
8485
- #12020 - Removed `sql_parser` configuration from the Redash source, as Redash now exclusively uses the sqlglot-based parser for lineage extraction.
8586
- #12020 - Removed `datahub.utilities.sql_parser`, `datahub.utilities.sql_parser_base` and `datahub.utilities.sql_lineage_parser_impl` module along with `SqlLineageSQLParser` and `DefaultSQLParser`. Use `create_lineage_sql_parsed_result` from `datahub.sql_parsing.sqlglot_lineage` module instead.
8687
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted
87-
(after 10d) or are timeseries *entities* (dataprocess, execution requests)
88+
(after 10d) or are timeseries _entities_ (dataprocess, execution requests)
8889
will be removed automatically using logic in the `datahub-gc` ingestion
8990
source.
9091
- #12067 - Default behavior of DataJobPatchBuilder in Python sdk has been

docs/lineage/airflow.md

+8-5
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ There's two implementations of the plugin, with different Airflow version suppor
1717

1818
| Approach | Airflow Versions | Notes |
1919
| --------- | ---------------- | --------------------------------------------------------------------------------------- |
20-
| Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ |
21-
| Plugin v1 | 2.3 - 2.8 | Deprecated. No automatic lineage extraction; may not extract lineage if the task fails. |
20+
| Plugin v2 | 2.5+ | Recommended. Requires Python 3.8+ |
21+
| Plugin v1 | 2.5 - 2.8 | Deprecated. No automatic lineage extraction; may not extract lineage if the task fails. |
2222

23-
If you're using Airflow older than 2.3, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details.
23+
If you're using Airflow older than 2.5, it's possible to use the plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details.
2424

2525
<!-- TODO: Update the local Airflow guide and link to it here. -->
2626
<!-- If you are looking to run Airflow and DataHub using docker locally, follow the guide [here](../../docker/airflow/local_airflow.md). -->
@@ -376,12 +376,15 @@ This will immediately disable the plugin without requiring a restart.
376376

377377
## Compatibility
378378

379-
We no longer officially support Airflow <2.3. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
380-
The first two options support Python 3.7+, and the last option supports Python 3.8+.
379+
We try to support Airflow releases for ~2 years after their release. This is a best-effort guarantee - it's not always possible due to dependency / security issues cropping up in older versions.
380+
381+
We no longer officially support Airflow <2.5. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
382+
The first two options support Python 3.7+, and the others require Python 3.8+.
381383

382384
- Airflow 1.10.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.9.1.0.
383385
- Airflow 2.0.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.11.0.1.
384386
- Airflow 2.2.x, use DataHub plugin v2 with acryl-datahub-airflow-plugin <= 0.14.1.5.
387+
- Airflow 2.3 - 2.4.3, use DataHub plugin v2 with acryl-datahub-airflow-plugin <= 1.0.0.
385388

386389
DataHub also previously supported an Airflow [lineage backend](https://airflow.apache.org/docs/apache-airflow/2.2.0/lineage.html#lineage-backend) implementation. While the implementation is still in our codebase, it is deprecated and will be removed in a future release.
387390
Note that the lineage backend did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package com.linkedin.metadata.aspect.hooks;
2+
3+
import static com.linkedin.metadata.Constants.OWNERSHIP_ASPECT_NAME;
4+
5+
import com.linkedin.common.*;
6+
import com.linkedin.common.urn.Urn;
7+
import com.linkedin.metadata.aspect.ReadItem;
8+
import com.linkedin.metadata.aspect.RetrieverContext;
9+
import com.linkedin.metadata.aspect.batch.ChangeMCP;
10+
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
11+
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
12+
import com.linkedin.util.Pair;
13+
import java.util.*;
14+
import java.util.stream.Stream;
15+
import javax.annotation.Nonnull;
16+
import lombok.Getter;
17+
import lombok.Setter;
18+
import lombok.experimental.Accessors;
19+
import lombok.extern.slf4j.Slf4j;
20+
21+
@Slf4j
22+
@Getter
23+
@Setter
24+
@Accessors(chain = true)
25+
public class OwnershipOwnerTypes extends MutationHook {
26+
@Nonnull private AspectPluginConfig config;
27+
28+
protected Stream<Pair<ChangeMCP, Boolean>> writeMutation(
29+
@Nonnull Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
30+
return changeMCPS.stream()
31+
.map(
32+
item ->
33+
aspectFilter(item)
34+
? Pair.of(item, processOwnershipAspect(item))
35+
: Pair.of(item, false));
36+
}
37+
38+
private static boolean aspectFilter(ReadItem item) {
39+
return item.getAspectName().equals(OWNERSHIP_ASPECT_NAME);
40+
}
41+
42+
public static Map<String, List<Urn>> toMap(UrnArrayMap map) {
43+
Map<String, List<Urn>> result = new HashMap<>();
44+
map.forEach(
45+
((s, urns) -> {
46+
result.put(s, new ArrayList<>(urns));
47+
}));
48+
return result;
49+
}
50+
51+
public static UrnArrayMap toUrnArrayMap(Map<String, List<Urn>> map) {
52+
UrnArrayMap result = new UrnArrayMap();
53+
map.forEach(
54+
(key, urns) -> {
55+
result.put(key, new UrnArray(urns));
56+
});
57+
return result;
58+
}
59+
60+
public static boolean processOwnershipAspect(ChangeMCP item) {
61+
boolean mutated = false;
62+
Ownership ownership = item.getAspect(Ownership.class);
63+
if (ownership == null) {
64+
return false;
65+
}
66+
UrnArrayMap ownerTypes = ownership.getOwnerTypes();
67+
Map<String, List<Urn>> ownerTypesMap;
68+
if (ownerTypes == null) {
69+
ownerTypesMap = new HashMap<>();
70+
mutated = true;
71+
} else {
72+
ownerTypesMap = toMap(ownerTypes);
73+
}
74+
OwnerArray owners = ownership.getOwners();
75+
for (Owner owner : owners) {
76+
String typeKey =
77+
Optional.ofNullable(owner.getTypeUrn())
78+
.map(Urn::toString)
79+
.orElseGet(
80+
() ->
81+
"urn:li:ownershipType:__system__" + owner.getType().toString().toLowerCase());
82+
83+
List<Urn> ownerOfType;
84+
if (ownerTypesMap.containsKey(typeKey)) {
85+
ownerOfType = ownerTypesMap.get(typeKey);
86+
} else {
87+
ownerOfType = new ArrayList<>();
88+
ownerTypesMap.put(typeKey, ownerOfType);
89+
mutated = true;
90+
}
91+
Urn ownerUrn = owner.getOwner();
92+
if (!ownerOfType.contains(ownerUrn)) {
93+
ownerOfType.add(ownerUrn);
94+
mutated = true;
95+
}
96+
}
97+
if (mutated) {
98+
ownership.setOwnerTypes((toUrnArrayMap(ownerTypesMap)));
99+
}
100+
return mutated;
101+
}
102+
}

0 commit comments

Comments
 (0)