Skip to content

Commit a38028b

Browse files
committed
rename to direct lineage; use jobs_view in sql; replace simple_name with name
Signed-off-by: Julien Le Dem <[email protected]>
1 parent 47cc1d5 commit a38028b

File tree

5 files changed

+28
-33
lines changed

5 files changed

+28
-33
lines changed

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public Response getLineage(
114114
@Consumes(APPLICATION_JSON)
115115
@Produces(APPLICATION_JSON)
116116
@Path("/lineage/direct")
117-
public Response getSimpleLineage(@QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) {
117+
public Response getDirectLineage(@QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) {
118118
if (!parentJobNodeId.isJobType()) {
119119
throw new IllegalArgumentException("Only job expected, got " + parentJobNodeId.getValue());
120120
}

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
import marquez.common.models.DatasetId;
1414
import marquez.common.models.JobId;
1515
import marquez.db.mappers.DatasetDataMapper;
16+
import marquez.db.mappers.DirectLineageEdgeMapper;
1617
import marquez.db.mappers.JobDataMapper;
1718
import marquez.db.mappers.JobRowMapper;
1819
import marquez.db.mappers.RunMapper;
19-
import marquez.db.mappers.SimpleLineageEdgeMapper;
2020
import marquez.service.models.DatasetData;
2121
import marquez.service.models.JobData;
2222
import marquez.service.models.Run;
@@ -28,12 +28,12 @@
2828
@RegisterRowMapper(JobDataMapper.class)
2929
@RegisterRowMapper(RunMapper.class)
3030
@RegisterRowMapper(JobRowMapper.class)
31-
@RegisterRowMapper(SimpleLineageEdgeMapper.class)
31+
@RegisterRowMapper(DirectLineageEdgeMapper.class)
3232
public interface LineageDao {
3333

34-
public record SimpleLineage(Collection<SimpleLineageEdge> edges) {}
34+
public record DirectLineage(Collection<DirectLineageEdge> edges) {}
3535

36-
public record SimpleLineageEdge(
36+
public record DirectLineageEdge(
3737
JobId job1,
3838
String direction,
3939
DatasetId dataset,
@@ -103,27 +103,22 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
103103
@SqlQuery(
104104
"""
105105
SELECT
106-
jobs.namespace_name AS job_namespace, jobs.simple_name AS job_name,
106+
jobs.namespace_name AS job_namespace, jobs."name" AS job_name,
107107
jvim.io_type AS io1,
108108
d.namespace_name AS ds_namespace, d."name" AS ds_name,
109109
jvim2.io_type AS io2,
110110
jv2.namespace_name AS job2_namespace, jv2.job_name AS job2_name,
111-
pj.namespace_name AS job2_parent_namespace, pj.simple_name AS job2_parent_name
112-
FROM jobs
111+
jv2.namespace_name AS job2_parent_namespace, j2.parent_job_name AS job2_parent_name
112+
FROM jobs_view jobs
113113
INNER JOIN job_versions jv ON jv.uuid = jobs.current_version_uuid
114114
LEFT JOIN job_versions_io_mapping jvim ON jvim.job_version_uuid = jobs.current_version_uuid
115115
LEFT JOIN datasets d ON d.uuid = jvim.dataset_uuid
116-
LEFT JOIN job_versions_io_mapping jvim2 ON jvim2.dataset_uuid = d.uuid AND jvim2.job_version_uuid <> jvim.job_version_uuid and jvim2.io_type <> jvim.io_type
116+
LEFT JOIN job_versions_io_mapping jvim2 ON jvim2.dataset_uuid = d.uuid AND jvim2.job_version_uuid <> jvim.job_version_uuid AND jvim2.io_type <> jvim.io_type
117117
LEFT JOIN job_versions jv2 ON jv2.uuid = jvim2.job_version_uuid
118-
LEFT JOIN jobs j2 ON jv2.job_uuid = j2.uuid
119-
LEFT JOIN jobs pj ON j2.parent_job_uuid = pj.uuid
120-
WHERE jobs.parent_job_uuid IN (
121-
SELECT uuid AS parent_job_uuid
122-
FROM jobs
123-
WHERE namespace_name=:parentJobNamespace and simple_name=:parentJobName
124-
);
118+
LEFT JOIN jobs_view j2 ON jv2.job_uuid = j2.uuid
119+
WHERE jobs.namespace_name = :parentJobNamespace AND jobs.parent_job_name = :parentJobName ;
125120
""")
126-
Collection<SimpleLineageEdge> getDirectLineageFromParent(
121+
Collection<DirectLineageEdge> getDirectLineageFromParent(
127122
String parentJobNamespace, String parentJobName);
128123

129124
@SqlQuery(

api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java renamed to api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
import marquez.common.models.JobId;
1616
import marquez.common.models.JobName;
1717
import marquez.common.models.NamespaceName;
18-
import marquez.db.LineageDao.SimpleLineageEdge;
18+
import marquez.db.LineageDao.DirectLineageEdge;
1919
import org.jdbi.v3.core.mapper.RowMapper;
2020
import org.jdbi.v3.core.statement.StatementContext;
2121

22-
public final class SimpleLineageEdgeMapper implements RowMapper<SimpleLineageEdge> {
22+
public final class DirectLineageEdgeMapper implements RowMapper<DirectLineageEdge> {
2323
@Override
24-
public SimpleLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context)
24+
public DirectLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context)
2525
throws SQLException {
2626
JobId job1 =
2727
JobId.of(
@@ -48,6 +48,6 @@ public SimpleLineageEdge map(@NonNull ResultSet results, @NonNull StatementConte
4848
: JobId.of(
4949
NamespaceName.of(job2parent_namespace),
5050
JobName.of(stringOrThrow(results, "job2_parent_name")));
51-
return new SimpleLineageEdge(job1, io1, ds, io2, job2, job2parent);
51+
return new DirectLineageEdge(job1, io1, ds, io2, job2, job2parent);
5252
}
5353
}

api/src/main/java/marquez/service/LineageService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,27 +79,27 @@ public LineageService(LineageDao delegate, JobDao jobDao) {
7979
public ParentLineage parentDirectLineage(JobId parentJobId) {
8080
log.debug("Attempting to get lineage for parent job '{}'", parentJobId);
8181

82-
Collection<SimpleLineageEdge> directLineageFromParent =
82+
Collection<DirectLineageEdge> directLineageFromParent =
8383
getDirectLineageFromParent(
8484
parentJobId.getNamespace().getValue(), parentJobId.getName().getValue());
8585

8686
Map<JobId, Map<String, Map<DatasetId, Map<String, List<JobWithParent>>>>> grouped =
8787
directLineageFromParent.stream()
8888
.collect(
8989
groupingBy(
90-
SimpleLineageEdge::job1,
90+
DirectLineageEdge::job1,
9191
filtering(
9292
e -> e.direction() != null,
9393
groupingBy(
94-
SimpleLineageEdge::direction,
94+
DirectLineageEdge::direction,
9595
filtering(
9696
e -> e.dataset() != null,
9797
groupingBy(
98-
SimpleLineageEdge::dataset,
98+
DirectLineageEdge::dataset,
9999
filtering(
100100
e -> e.direction2() != null,
101101
groupingBy(
102-
SimpleLineageEdge::direction2,
102+
DirectLineageEdge::direction2,
103103
mapping(
104104
e -> new JobWithParent(e.job2(), e.job2parent()),
105105
toList())))))))));

api/src/test/java/marquez/db/LineageDaoTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import java.util.stream.Stream;
3333
import marquez.api.JdbiUtils;
3434
import marquez.common.models.JobType;
35-
import marquez.db.LineageDao.SimpleLineageEdge;
35+
import marquez.db.LineageDao.DirectLineageEdge;
3636
import marquez.db.LineageTestUtils.DatasetConsumerJob;
3737
import marquez.db.LineageTestUtils.JobLineage;
3838
import marquez.db.models.JobRow;
@@ -170,11 +170,11 @@ public void testGetLineage() {
170170
expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator);
171171
}
172172

173-
Collection<SimpleLineageEdge> directLineageFromParent =
173+
Collection<DirectLineageEdge> FromParent =
174174
lineageDao.getDirectLineageFromParent(
175175
disjointJob.getJob().getNamespaceName(), disjointJob.getJob().getName());
176-
assertNotNull(directLineageFromParent);
177-
assertTrue(directLineageFromParent.toString(), directLineageFromParent.size() == 0);
176+
assertNotNull(FromParent);
177+
assertTrue(FromParent.toString(), FromParent.size() == 0);
178178
}
179179

180180
@Test
@@ -322,11 +322,11 @@ public void testGetLineageWithJobThatHasNoDownstreamConsumers() {
322322
}
323323

324324
@Test
325-
public void testGetDirectLineageFromParent() {
325+
public void testGetFromParent() {
326326
FacetTestUtils.createLineageWithFacets(openLineageDao);
327-
Collection<SimpleLineageEdge> directLineageFromParent =
327+
Collection<DirectLineageEdge> FromParent =
328328
lineageDao.getDirectLineageFromParent("namespace", "name");
329-
assertTrue(directLineageFromParent.toString(), directLineageFromParent.size() == 2);
329+
assertTrue(FromParent.toString(), FromParent.size() == 2);
330330
}
331331

332332
@Test

0 commit comments

Comments
 (0)