Skip to content

Commit 6f3a7aa

Browse files
python client for column lineage API (#2209)
Signed-off-by: Pawel Leszczynski <[email protected]> Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent 74f062f commit 6f3a7aa

File tree

4 files changed

+153
-1
lines changed

4 files changed

+153
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1111
* Column lineage within Marquez Java client [`#2163`](https://github.com/MarquezProject/marquez/pull/2163) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1212
* Endpoint to get column lineage by a job [`#2204`](https://github.com/MarquezProject/marquez/pull/2204) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
13+
* Python client for column lineage [`#2209`](https://github.com/MarquezProject/marquez/pull/2209) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
1314

1415

1516
### Fixed

clients/python/marquez_client/client.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
DEFAULT_TIMEOUT_MS,
1818
DEFAULT_LIMIT,
1919
DEFAULT_OFFSET,
20+
DEFAULT_DEPTH,
21+
DEFAULT_WITH_DOWNSTREAM,
2022
API_PATH_V1
2123
)
2224
from marquez_client.models import (
@@ -341,6 +343,35 @@ def get_job_run(self, run_id):
341343
Utils.is_valid_uuid(run_id, 'run_id')
342344
return self._get(self._url('/jobs/runs/{0}', run_id))
343345

346+
def get_column_lineage_by_dataset(self, namespace, dataset, depth=None, with_downstream=None):
347+
node_id = "dataset:{0}:{1}".format(namespace, dataset)
348+
return self._get_column_lineage(node_id, depth, with_downstream)
349+
350+
def get_column_lineage_by_dataset_field(
351+
self,
352+
namespace,
353+
dataset,
354+
field,
355+
depth=None,
356+
with_downstream=None
357+
):
358+
node_id = "datasetField:{0}:{1}:{2}".format(namespace, dataset, field)
359+
return self._get_column_lineage(node_id, depth, with_downstream)
360+
361+
def get_column_lineage_by_job(self, namespace, job, depth=None, with_downstream=None):
362+
node_id = "job:{0}:{1}".format(namespace, job)
363+
return self._get_column_lineage(node_id, depth, with_downstream)
364+
365+
def _get_column_lineage(self, node_id, depth, with_downstream):
366+
return self._get(
367+
self._url('/column-lineage'),
368+
params={
369+
'nodeId': node_id,
370+
'depth': depth or DEFAULT_DEPTH,
371+
'withDownstream': with_downstream or DEFAULT_WITH_DOWNSTREAM
372+
}
373+
)
374+
344375
@deprecated(deprecated_in='0.20.0', removed_in='0.25.0',
345376
details='Use OpenLineage instead, see `https://openlineage.io`')
346377
def mark_job_run_as_started(self, run_id, at=None):

clients/python/marquez_client/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@
66
DEFAULT_MARQUEZ_URL = 'http://localhost:8080'
77
DEFAULT_LIMIT = 100
88
DEFAULT_OFFSET = 0
9+
DEFAULT_DEPTH = 20
10+
DEFAULT_WITH_DOWNSTREAM = False
911

1012
API_PATH_V1 = '/api/v1'

clients/python/tests/test_marquez_client.py

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
from marquez_client import MarquezClient
1212
from marquez_client.constants import (
1313
DEFAULT_LIMIT,
14-
DEFAULT_OFFSET
14+
DEFAULT_OFFSET,
15+
DEFAULT_DEPTH,
16+
DEFAULT_WITH_DOWNSTREAM
1517
)
1618
from marquez_client.models import (
1719
DatasetType,
@@ -237,6 +239,37 @@
237239
'producer': PRODUCER
238240
}
239241

242+
COLUMN_LINEAGE = [
243+
{
244+
'id': 'datasetField:namespace:commonDataset:columnA',
245+
'type': 'DATASET_FIELD',
246+
'data': {
247+
'type': 'DATASET_FIELD',
248+
'namespace': 'namespace',
249+
'dataset': 'otherDataset',
250+
'field': 'columnA',
251+
'fieldType': 'integer',
252+
'transformationDescription': 'identical',
253+
'transformationType': 'IDENTITY',
254+
'inputFields': [
255+
{'namespace': 'namespace', 'dataset': 'otherDataset', 'field': 'columnB'}
256+
]
257+
},
258+
'inEdges': [
259+
{
260+
'origin': 'datasetField:namespace:otherDataset:columnB',
261+
'destination': 'datasetField:namespace:commonDataset:columnA'
262+
}
263+
],
264+
'outEdges': [
265+
{
266+
'origin': 'datasetField:namespace:commonDataset:columnA',
267+
'destination': 'datasetField:namespace:otherDataset:columnC'
268+
}
269+
]
270+
}
271+
]
272+
240273

241274
@pytest.fixture
242275
def client():
@@ -938,3 +971,88 @@ def test_list_tags(mock_get, client):
938971
},
939972
timeout=mock.ANY
940973
)
974+
975+
976+
@mock.patch('requests.get')
977+
def test_get_column_lineage_by_dataset(mock_get, client):
978+
mock_get.return_value.status_code.return_value = HTTPStatus.OK
979+
mock_get.return_value.json.return_value = COLUMN_LINEAGE
980+
981+
column_lineage = client.get_column_lineage_by_dataset(
982+
"namespace_a",
983+
"dataset_a",
984+
DEFAULT_DEPTH,
985+
DEFAULT_WITH_DOWNSTREAM
986+
)
987+
988+
assert column_lineage == COLUMN_LINEAGE
989+
990+
mock_get.assert_called_once_with(
991+
url=client._url(
992+
'/column-lineage'
993+
),
994+
headers=mock.ANY,
995+
params={
996+
'nodeId': 'dataset:namespace_a:dataset_a',
997+
'depth': DEFAULT_DEPTH,
998+
'withDownstream': DEFAULT_WITH_DOWNSTREAM
999+
},
1000+
timeout=mock.ANY
1001+
)
1002+
1003+
1004+
@mock.patch('requests.get')
1005+
def test_get_column_lineage_by_dataset_field(mock_get, client):
1006+
mock_get.return_value.status_code.return_value = HTTPStatus.OK
1007+
mock_get.return_value.json.return_value = COLUMN_LINEAGE
1008+
1009+
column_lineage = client.get_column_lineage_by_dataset_field(
1010+
"namespace_a",
1011+
"dataset_a",
1012+
"field_a",
1013+
DEFAULT_DEPTH,
1014+
DEFAULT_WITH_DOWNSTREAM
1015+
)
1016+
1017+
assert column_lineage == COLUMN_LINEAGE
1018+
1019+
mock_get.assert_called_once_with(
1020+
url=client._url(
1021+
'/column-lineage'
1022+
),
1023+
headers=mock.ANY,
1024+
params={
1025+
'nodeId': 'datasetField:namespace_a:dataset_a:field_a',
1026+
'depth': DEFAULT_DEPTH,
1027+
'withDownstream': DEFAULT_WITH_DOWNSTREAM
1028+
},
1029+
timeout=mock.ANY
1030+
)
1031+
1032+
1033+
@mock.patch('requests.get')
1034+
def test_get_column_lineage_by_job(mock_get, client):
1035+
mock_get.return_value.status_code.return_value = HTTPStatus.OK
1036+
mock_get.return_value.json.return_value = COLUMN_LINEAGE
1037+
1038+
column_lineage = client.get_column_lineage_by_job(
1039+
"namespace_a",
1040+
"job_a",
1041+
DEFAULT_DEPTH,
1042+
DEFAULT_WITH_DOWNSTREAM
1043+
)
1044+
1045+
assert column_lineage == COLUMN_LINEAGE
1046+
1047+
mock_get.assert_called_once_with(
1048+
url=client._url(
1049+
'/column-lineage'
1050+
),
1051+
headers=mock.ANY,
1052+
params={
1053+
'nodeId': 'job:namespace_a:job_a',
1054+
'depth': DEFAULT_DEPTH,
1055+
'withDownstream': DEFAULT_WITH_DOWNSTREAM
1056+
},
1057+
timeout=mock.ANY
1058+
)

0 commit comments

Comments
 (0)