From 41c243906b46e034650b6c2e920fee8b99f82332 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Mon, 23 Oct 2023 14:40:08 -0700 Subject: [PATCH 1/9] provide simple implementation of one-level lineage optimized for parent jobs Signed-off-by: Julien Le Dem --- .../java/marquez/api/OpenLineageResource.java | 16 +++ api/src/main/java/marquez/db/Columns.java | 2 +- api/src/main/java/marquez/db/LineageDao.java | 57 ++++++++++- .../db/mappers/SimpleLineageEdgeMapper.java | 36 +++++++ .../java/marquez/service/LineageService.java | 76 +++++++++++++- .../service/models/EventTypeResolver.java | 1 + .../marquez/OpenLineageIntegrationTest.java | 98 ++++++++++++++----- .../test/java/marquez/db/LineageDaoTest.java | 3 + 8 files changed, 258 insertions(+), 31 deletions(-) create mode 100644 api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 259a500a53..b9a4a0a436 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -107,6 +107,22 @@ public Response getLineage( return Response.ok(lineageService.lineage(nodeId, depth, true)).build(); } + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON) + @Path("/lineage/direct") + public Response getSimpleLineage( + @QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) { + if (!parentJobNodeId.isJobType()) { + throw new IllegalArgumentException("Only job expected, got " + parentJobNodeId.getValue()); + } + throwIfNotExists(parentJobNodeId); + return Response.ok(lineageService.parentDirectLineage(parentJobNodeId.asJobId())).build(); + } + @Timed @ResponseMetered @ExceptionMetered diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index dfdf67492a..a5df6a2c4b 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -183,7 +183,7 @@ public static String stringOrNull(final ResultSet results, final String column) public static String stringOrThrow(final ResultSet results, final String column) throws SQLException { if (results.getObject(column) == null) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("no column found for " + column); } return results.getString(column); } diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index c45a06e5a9..d79f5258c0 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -10,23 +10,42 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; + +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.BindList; +import org.jdbi.v3.sqlobject.statement.SqlQuery; + +import marquez.common.models.DatasetId; +import marquez.common.models.JobId; import marquez.db.mappers.DatasetDataMapper; import marquez.db.mappers.JobDataMapper; import marquez.db.mappers.JobRowMapper; import marquez.db.mappers.RunMapper; +import marquez.db.mappers.SimpleLineageEdgeMapper; import marquez.service.models.DatasetData; import marquez.service.models.JobData; import marquez.service.models.Run; -import org.jdbi.v3.sqlobject.config.RegisterRowMapper; -import org.jdbi.v3.sqlobject.customizer.BindList; -import org.jdbi.v3.sqlobject.statement.SqlQuery; @RegisterRowMapper(DatasetDataMapper.class) @RegisterRowMapper(JobDataMapper.class) @RegisterRowMapper(RunMapper.class) @RegisterRowMapper(JobRowMapper.class) +@RegisterRowMapper(SimpleLineageEdgeMapper.class) public interface LineageDao { + public record SimpleLineage(Collection edges) { + } + + public record SimpleLineageEdge( + JobId job1, + String direction, + DatasetId dataset, + String direction2, + JobId job2, + JobId job2parent + ) { + + } /** * Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the * input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have @@ -79,6 +98,38 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids """) Set getLineage(@BindList Set jobIds, int depth); + /** + * 1 level of lineage for all the children jobs of the given parent + * + * @param parentJobNamespace the namespace of the parent + * @param parentJobName the name of the parent + * @return edges form job to dataset to job + */ + @SqlQuery( + """ + SELECT + jobs.namespace_name AS job_namespace, jobs.simple_name AS job_name, + jvim.io_type AS io1, + d.namespace_name AS ds_namespace, d."name" AS ds_name, + jvim2.io_type AS io2, + jv2.namespace_name AS job2_namespace, jv2.job_name AS job2_name, + pj.namespace_name AS job2_parent_namespace, pj.simple_name AS job2_parent_name + FROM jobs + INNER JOIN job_versions jv ON jv.uuid = jobs.current_version_uuid + LEFT JOIN job_versions_io_mapping jvim ON jvim.job_version_uuid = jobs.current_version_uuid + LEFT JOIN datasets d ON d.uuid = jvim.dataset_uuid + LEFT JOIN job_versions_io_mapping jvim2 ON jvim2.dataset_uuid = d.uuid AND jvim2.job_version_uuid <> jvim.job_version_uuid and jvim2.io_type <> jvim.io_type + LEFT JOIN job_versions jv2 ON jv2.uuid = jvim2.job_version_uuid + LEFT JOIN jobs j2 ON jv2.job_uuid = j2.uuid + LEFT JOIN jobs pj ON j2.parent_job_uuid = pj.uuid + WHERE jobs.parent_job_uuid IN ( + SELECT uuid AS parent_job_uuid + FROM jobs + WHERE namespace_name=:parentJobNamespace and simple_name=:parentJobName + ); + """) + Collection getDirectLineageFromParent(String parentJobNamespace, String parentJobName); + @SqlQuery( """ SELECT ds.*, dv.fields, dv.lifecycle_state diff --git a/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java b/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java new file mode 100644 index 0000000000..993a34465b --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java @@ -0,0 +1,36 @@ +package marquez.db.mappers; + +import static marquez.db.Columns.stringOrNull; +import static marquez.db.Columns.stringOrThrow; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +import lombok.NonNull; +import marquez.common.models.DatasetId; +import marquez.common.models.DatasetName; +import marquez.common.models.JobId; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; +import marquez.db.LineageDao.SimpleLineageEdge; + +public final class SimpleLineageEdgeMapper implements RowMapper { + @Override + public SimpleLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + JobId job1 = JobId.of(NamespaceName.of(stringOrThrow(results, "job_namespace")), JobName.of(stringOrThrow(results, "job_name"))); + String io1 = stringOrNull(results, "io1"); + String ds_namespace = stringOrNull(results, "ds_namespace"); + DatasetId ds = ds_namespace == null ? null : new DatasetId(NamespaceName.of(ds_namespace), DatasetName.of(stringOrNull(results, "ds_name"))); + String io2 = stringOrNull(results, "io2"); + String job2_namespace = stringOrNull(results, "job2_namespace"); + JobId job2 = job2_namespace == null ? null : JobId.of(NamespaceName.of(job2_namespace), JobName.of(stringOrThrow(results, "job2_name"))); + String job2parent_namespace = stringOrNull(results, "job2_parent_namespace"); + JobId job2parent = job2parent_namespace == null ? null : JobId.of(NamespaceName.of(job2parent_namespace), JobName.of(stringOrThrow(results, "job2_parent_name"))); + return new SimpleLineageEdge(job1, io1, ds, io2, job2, job2parent); + } +} + diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 1c2dc34a05..724f13e7bd 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -5,10 +5,12 @@ package marquez.service; -import com.google.common.base.Functions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSortedSet; -import com.google.common.collect.Maps; +import static java.util.stream.Collectors.filtering; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; + +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -21,6 +23,12 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; + +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Maps; + import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import marquez.common.models.DatasetId; @@ -41,6 +49,19 @@ @Slf4j public class LineageService extends DelegatingLineageDao { + + public record JobWithParent(JobId job, JobId parent) { + } + + public record DatasetLineage(DatasetId dataset, Collection consumers, Collection producers) { + } + + public record ChildLineage(JobId job, Collection inputs, Collection outputs) { + } + + public record ParentLineage(JobId parent, Collection children) { + } + private final JobDao jobDao; public LineageService(LineageDao delegate, JobDao jobDao) { @@ -48,6 +69,53 @@ public LineageService(LineageDao delegate, JobDao jobDao) { this.jobDao = jobDao; } + /** + * This method is specialized for returning one level of lineage from a parent job. + * It finds all the children of the provided parent node + * It then finds the input and output datasets those children write to. + * It finally returns the other jobs consuming or producing those datasets (and their parent). + * @param parentJobId the parent job + * @return 1 level of lineage for all the children jobs of the given parent + */ + public ParentLineage parentDirectLineage(JobId parentJobId) { + log.debug("Attempting to get lineage for parent job '{}'", parentJobId); + + Collection directLineageFromParent = + getDirectLineageFromParent(parentJobId.getNamespace().getValue(), parentJobId.getName().getValue()); + + + Map>>>> grouped = + directLineageFromParent.stream().collect( + groupingBy(SimpleLineageEdge::job1, + filtering(e -> e.direction() != null, + groupingBy(SimpleLineageEdge::direction, + filtering(e -> e.dataset() != null, + groupingBy(SimpleLineageEdge::dataset, + filtering(e -> e.direction2() != null, + groupingBy(SimpleLineageEdge::direction2, + mapping(e -> new JobWithParent(e.job2(), e.job2parent()), + toList()))))))))); + + List children = grouped.entrySet().stream().map( + e -> new ChildLineage( + e.getKey(), + toDatasetLineages(e.getValue().get("INPUT")), + toDatasetLineages(e.getValue().get("OUTPUT")) + ) + ).collect(toList()); + return new ParentLineage(parentJobId, children); + } + + private Collection toDatasetLineages(Map>> datasets) { + return datasets == null ? null : datasets.entrySet().stream().map( + e -> new DatasetLineage( + e.getKey(), + e.getValue().get("INPUT"), + e.getValue().get("OUTPUT") + ) + ).collect(toList()); + } + // TODO make input parameters easily extendable if adding more options like 'withJobFacets' public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) { log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth); diff --git a/api/src/main/java/marquez/service/models/EventTypeResolver.java b/api/src/main/java/marquez/service/models/EventTypeResolver.java index 15aab272a3..84ad839ea8 100644 --- a/api/src/main/java/marquez/service/models/EventTypeResolver.java +++ b/api/src/main/java/marquez/service/models/EventTypeResolver.java @@ -79,6 +79,7 @@ public JavaType typeFromId(DatabindContext context, String id) throws IOExceptio .filter(s -> s.getName().equals(type)) .findAny() .map(EventSchemaURL::getSubType) + .map(p -> (Class)p) .orElse(LINEAGE_EVENT.subType); return context.constructSpecializedType(superType, subType); diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 4729a64971..54fdebfe40 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -10,23 +10,13 @@ import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import com.google.common.collect.ImmutableMap; -import io.dropwizard.util.Resources; -import io.openlineage.client.OpenLineage; -import io.openlineage.client.OpenLineage.RunEvent; -import io.openlineage.client.OpenLineage.RunEvent.EventType; -import io.openlineage.client.OpenLineage.RunFacet; -import io.openlineage.client.OpenLineage.RunFacetsBuilder; import java.io.IOException; import java.net.URI; +import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; import java.nio.charset.Charset; import java.time.Instant; import java.time.ZoneId; @@ -43,6 +33,32 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + +import org.assertj.core.api.InstanceOfAssertFactories; +import org.jdbi.v3.core.Jdbi; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.google.common.collect.ImmutableMap; + +import io.dropwizard.util.Resources; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.RunEvent; +import io.openlineage.client.OpenLineage.RunEvent.EventType; +import io.openlineage.client.OpenLineage.RunFacet; +import io.openlineage.client.OpenLineage.RunFacetsBuilder; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import marquez.api.JdbiUtils; @@ -54,19 +70,12 @@ import marquez.client.models.LineageEvent; import marquez.client.models.Run; import marquez.common.Utils; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; import marquez.db.LineageTestUtils; import marquez.service.models.DatasetEvent; import marquez.service.models.JobEvent; -import org.assertj.core.api.InstanceOfAssertFactories; -import org.jdbi.v3.core.Jdbi; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; -import org.slf4j.LoggerFactory; +import marquez.service.models.NodeId; @org.junit.jupiter.api.Tag("IntegrationTests") @Slf4j @@ -316,6 +325,29 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet() .hasFieldOrPropertyWithValue("parentJobName", null); List runsList = client.listRuns(NAMESPACE_NAME, dagName); assertThat(runsList).isNotEmpty().hasSize(1); + + + + marquez.common.models.JobId jobId = new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName)); + String nodeId = NodeId.of(jobId).getValue(); + HttpRequest request = + HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId)) + .header("Content-Type", "application/json") + .GET() + .build(); + + HttpResponse resp; + try { + resp = http2.send(request, BodyHandlers.ofString()); + + assertEquals(200, resp.statusCode(), resp.body()); + assertTrue(resp.body().contains("task1"), resp.body()); + assertTrue(resp.body().contains("task2"), resp.body()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } @Test @@ -390,6 +422,26 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF .hasFieldOrPropertyWithValue("parentJobName", null); List runsList = client.listRuns(NAMESPACE_NAME, dagName); assertThat(runsList).isNotEmpty().hasSize(1); + + marquez.common.models.JobId jobId = new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName)); + String nodeId = NodeId.of(jobId).getValue(); + HttpRequest request = + HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId)) + .header("Content-Type", "application/json") + .GET() + .build(); + + HttpResponse resp; + try { + resp = http2.send(request, BodyHandlers.ofString()); + + assertEquals(200, resp.statusCode(), resp.body()); + assertTrue(resp.body().contains("task1"), resp.body()); + assertTrue(resp.body().contains("task2"), resp.body()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } } @Test diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 354ab495dc..76003b1245 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -165,6 +165,9 @@ public void testGetLineage() { .containsAll( expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator); } + + + lineageDao.getDirectLineageFromParent("foo", "bar"); } @Test From 023985e6159730edebfc43f6d6177c91b2cb897f Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Mon, 23 Oct 2023 15:27:42 -0700 Subject: [PATCH 2/9] spotless apply Signed-off-by: Julien Le Dem --- .../java/marquez/api/OpenLineageResource.java | 3 +- api/src/main/java/marquez/db/LineageDao.java | 20 ++-- .../db/mappers/SimpleLineageEdgeMapper.java | 37 +++++-- .../java/marquez/service/LineageService.java | 104 ++++++++++-------- .../service/models/EventTypeResolver.java | 2 +- .../marquez/OpenLineageIntegrationTest.java | 74 ++++++------- .../test/java/marquez/db/LineageDaoTest.java | 1 - 7 files changed, 130 insertions(+), 111 deletions(-) diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index b9a4a0a436..5f411d54a7 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -114,8 +114,7 @@ public Response getLineage( @Consumes(APPLICATION_JSON) @Produces(APPLICATION_JSON) @Path("/lineage/direct") - public Response getSimpleLineage( - @QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) { + public Response getSimpleLineage(@QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) { if (!parentJobNodeId.isJobType()) { throw new IllegalArgumentException("Only job expected, got " + parentJobNodeId.getValue()); } diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index d79f5258c0..00382ba20b 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -10,11 +10,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; - -import org.jdbi.v3.sqlobject.config.RegisterRowMapper; -import org.jdbi.v3.sqlobject.customizer.BindList; -import org.jdbi.v3.sqlobject.statement.SqlQuery; - import marquez.common.models.DatasetId; import marquez.common.models.JobId; import marquez.db.mappers.DatasetDataMapper; @@ -25,6 +20,9 @@ import marquez.service.models.DatasetData; import marquez.service.models.JobData; import marquez.service.models.Run; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.BindList; +import org.jdbi.v3.sqlobject.statement.SqlQuery; @RegisterRowMapper(DatasetDataMapper.class) @RegisterRowMapper(JobDataMapper.class) @@ -33,8 +31,7 @@ @RegisterRowMapper(SimpleLineageEdgeMapper.class) public interface LineageDao { - public record SimpleLineage(Collection edges) { - } + public record SimpleLineage(Collection edges) {} public record SimpleLineageEdge( JobId job1, @@ -42,10 +39,8 @@ public record SimpleLineageEdge( DatasetId dataset, String direction2, JobId job2, - JobId job2parent - ) { + JobId job2parent) {} - } /** * Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the * input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have @@ -99,7 +94,7 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids Set getLineage(@BindList Set jobIds, int depth); /** - * 1 level of lineage for all the children jobs of the given parent + * 1 level of lineage for all the children jobs of the given parent * * @param parentJobNamespace the namespace of the parent * @param parentJobName the name of the parent @@ -128,7 +123,8 @@ WHERE jobs.parent_job_uuid IN ( WHERE namespace_name=:parentJobNamespace and simple_name=:parentJobName ); """) - Collection getDirectLineageFromParent(String parentJobNamespace, String parentJobName); + Collection getDirectLineageFromParent( + String parentJobNamespace, String parentJobName); @SqlQuery( """ diff --git a/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java b/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java index 993a34465b..00753f866c 100644 --- a/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java +++ b/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java @@ -1,3 +1,7 @@ +/* + * Copyright 2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ package marquez.db.mappers; import static marquez.db.Columns.stringOrNull; @@ -5,10 +9,6 @@ import java.sql.ResultSet; import java.sql.SQLException; - -import org.jdbi.v3.core.mapper.RowMapper; -import org.jdbi.v3.core.statement.StatementContext; - import lombok.NonNull; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; @@ -16,21 +16,38 @@ import marquez.common.models.JobName; import marquez.common.models.NamespaceName; import marquez.db.LineageDao.SimpleLineageEdge; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; public final class SimpleLineageEdgeMapper implements RowMapper { @Override public SimpleLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context) throws SQLException { - JobId job1 = JobId.of(NamespaceName.of(stringOrThrow(results, "job_namespace")), JobName.of(stringOrThrow(results, "job_name"))); + JobId job1 = + JobId.of( + NamespaceName.of(stringOrThrow(results, "job_namespace")), + JobName.of(stringOrThrow(results, "job_name"))); String io1 = stringOrNull(results, "io1"); String ds_namespace = stringOrNull(results, "ds_namespace"); - DatasetId ds = ds_namespace == null ? null : new DatasetId(NamespaceName.of(ds_namespace), DatasetName.of(stringOrNull(results, "ds_name"))); + DatasetId ds = + ds_namespace == null + ? null + : new DatasetId( + NamespaceName.of(ds_namespace), DatasetName.of(stringOrNull(results, "ds_name"))); String io2 = stringOrNull(results, "io2"); String job2_namespace = stringOrNull(results, "job2_namespace"); - JobId job2 = job2_namespace == null ? null : JobId.of(NamespaceName.of(job2_namespace), JobName.of(stringOrThrow(results, "job2_name"))); + JobId job2 = + job2_namespace == null + ? null + : JobId.of( + NamespaceName.of(job2_namespace), JobName.of(stringOrThrow(results, "job2_name"))); String job2parent_namespace = stringOrNull(results, "job2_parent_namespace"); - JobId job2parent = job2parent_namespace == null ? null : JobId.of(NamespaceName.of(job2parent_namespace), JobName.of(stringOrThrow(results, "job2_parent_name"))); - return new SimpleLineageEdge(job1, io1, ds, io2, job2, job2parent); + JobId job2parent = + job2parent_namespace == null + ? null + : JobId.of( + NamespaceName.of(job2parent_namespace), + JobName.of(stringOrThrow(results, "job2_parent_name"))); + return new SimpleLineageEdge(job1, io1, ds, io2, job2, job2parent); } } - diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 724f13e7bd..2f5da474ed 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -10,6 +10,10 @@ import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Maps; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -23,12 +27,6 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; - -import com.google.common.base.Functions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSortedSet; -import com.google.common.collect.Maps; - import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import marquez.common.models.DatasetId; @@ -50,17 +48,17 @@ @Slf4j public class LineageService extends DelegatingLineageDao { - public record JobWithParent(JobId job, JobId parent) { - } + public record JobWithParent(JobId job, JobId parent) {} - public record DatasetLineage(DatasetId dataset, Collection consumers, Collection producers) { - } + public record DatasetLineage( + DatasetId dataset, + Collection consumers, + Collection producers) {} - public record ChildLineage(JobId job, Collection inputs, Collection outputs) { - } + public record ChildLineage( + JobId job, Collection inputs, Collection outputs) {} - public record ParentLineage(JobId parent, Collection children) { - } + public record ParentLineage(JobId parent, Collection children) {} private final JobDao jobDao; @@ -70,10 +68,11 @@ public LineageService(LineageDao delegate, JobDao jobDao) { } /** - * This method is specialized for returning one level of lineage from a parent job. - * It finds all the children of the provided parent node - * It then finds the input and output datasets those children write to. - * It finally returns the other jobs consuming or producing those datasets (and their parent). + * This method is specialized for returning one level of lineage from a parent job. It finds all + * the children of the provided parent node It then finds the input and output datasets those + * children write to. It finally returns the other jobs consuming or producing those datasets (and + * their parent). + * * @param parentJobId the parent job * @return 1 level of lineage for all the children jobs of the given parent */ @@ -81,39 +80,52 @@ public ParentLineage parentDirectLineage(JobId parentJobId) { log.debug("Attempting to get lineage for parent job '{}'", parentJobId); Collection directLineageFromParent = - getDirectLineageFromParent(parentJobId.getNamespace().getValue(), parentJobId.getName().getValue()); - + getDirectLineageFromParent( + parentJobId.getNamespace().getValue(), parentJobId.getName().getValue()); Map>>>> grouped = - directLineageFromParent.stream().collect( - groupingBy(SimpleLineageEdge::job1, - filtering(e -> e.direction() != null, - groupingBy(SimpleLineageEdge::direction, - filtering(e -> e.dataset() != null, - groupingBy(SimpleLineageEdge::dataset, - filtering(e -> e.direction2() != null, - groupingBy(SimpleLineageEdge::direction2, - mapping(e -> new JobWithParent(e.job2(), e.job2parent()), - toList()))))))))); - - List children = grouped.entrySet().stream().map( - e -> new ChildLineage( - e.getKey(), - toDatasetLineages(e.getValue().get("INPUT")), - toDatasetLineages(e.getValue().get("OUTPUT")) - ) - ).collect(toList()); + directLineageFromParent.stream() + .collect( + groupingBy( + SimpleLineageEdge::job1, + filtering( + e -> e.direction() != null, + groupingBy( + SimpleLineageEdge::direction, + filtering( + e -> e.dataset() != null, + groupingBy( + SimpleLineageEdge::dataset, + filtering( + e -> e.direction2() != null, + groupingBy( + SimpleLineageEdge::direction2, + mapping( + e -> new JobWithParent(e.job2(), e.job2parent()), + toList()))))))))); + + List children = + grouped.entrySet().stream() + .map( + e -> + new ChildLineage( + e.getKey(), + toDatasetLineages(e.getValue().get("INPUT")), + toDatasetLineages(e.getValue().get("OUTPUT")))) + .collect(toList()); return new ParentLineage(parentJobId, children); } - private Collection toDatasetLineages(Map>> datasets) { - return datasets == null ? null : datasets.entrySet().stream().map( - e -> new DatasetLineage( - e.getKey(), - e.getValue().get("INPUT"), - e.getValue().get("OUTPUT") - ) - ).collect(toList()); + private Collection toDatasetLineages( + Map>> datasets) { + return datasets == null + ? null + : datasets.entrySet().stream() + .map( + e -> + new DatasetLineage( + e.getKey(), e.getValue().get("INPUT"), e.getValue().get("OUTPUT"))) + .collect(toList()); } // TODO make input parameters easily extendable if adding more options like 'withJobFacets' diff --git a/api/src/main/java/marquez/service/models/EventTypeResolver.java b/api/src/main/java/marquez/service/models/EventTypeResolver.java index 84ad839ea8..fa4f52adb8 100644 --- a/api/src/main/java/marquez/service/models/EventTypeResolver.java +++ b/api/src/main/java/marquez/service/models/EventTypeResolver.java @@ -79,7 +79,7 @@ public JavaType typeFromId(DatabindContext context, String id) throws IOExceptio .filter(s -> s.getName().equals(type)) .findAny() .map(EventSchemaURL::getSubType) - .map(p -> (Class)p) + .map(p -> (Class) p) .orElse(LINEAGE_EVENT.subType); return context.constructSpecializedType(superType, subType); diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 54fdebfe40..25f4fa102b 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -12,6 +12,19 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.google.common.collect.ImmutableMap; +import io.dropwizard.util.Resources; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.RunEvent; +import io.openlineage.client.OpenLineage.RunEvent.EventType; +import io.openlineage.client.OpenLineage.RunFacet; +import io.openlineage.client.OpenLineage.RunFacetsBuilder; import java.io.IOException; import java.net.URI; import java.net.http.HttpRequest; @@ -33,32 +46,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - -import org.assertj.core.api.InstanceOfAssertFactories; -import org.jdbi.v3.core.Jdbi; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import com.google.common.collect.ImmutableMap; - -import io.dropwizard.util.Resources; -import io.openlineage.client.OpenLineage; -import io.openlineage.client.OpenLineage.RunEvent; -import io.openlineage.client.OpenLineage.RunEvent.EventType; -import io.openlineage.client.OpenLineage.RunFacet; -import io.openlineage.client.OpenLineage.RunFacetsBuilder; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import marquez.api.JdbiUtils; @@ -76,6 +63,16 @@ import marquez.service.models.DatasetEvent; import marquez.service.models.JobEvent; import marquez.service.models.NodeId; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.jdbi.v3.core.Jdbi; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.LoggerFactory; @org.junit.jupiter.api.Tag("IntegrationTests") @Slf4j @@ -326,16 +323,15 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet() List runsList = client.listRuns(NAMESPACE_NAME, dagName); assertThat(runsList).isNotEmpty().hasSize(1); - - - marquez.common.models.JobId jobId = new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName)); + marquez.common.models.JobId jobId = + new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName)); String nodeId = NodeId.of(jobId).getValue(); HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId)) - .header("Content-Type", "application/json") - .GET() - .build(); + .uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId)) + .header("Content-Type", "application/json") + .GET() + .build(); HttpResponse resp; try { @@ -347,7 +343,6 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet() } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } - } @Test @@ -423,14 +418,15 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF List runsList = client.listRuns(NAMESPACE_NAME, dagName); assertThat(runsList).isNotEmpty().hasSize(1); - marquez.common.models.JobId jobId = new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName)); + marquez.common.models.JobId jobId = + new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName)); String nodeId = NodeId.of(jobId).getValue(); HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId)) - .header("Content-Type", "application/json") - .GET() - .build(); + .uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId)) + .header("Content-Type", "application/json") + .GET() + .build(); HttpResponse resp; try { diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 76003b1245..48e0dd18cf 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -166,7 +166,6 @@ public void testGetLineage() { expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator); } - lineageDao.getDirectLineageFromParent("foo", "bar"); } From 72c96f7a5f41b319ca2c6ba417e06322e694149d Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Mon, 23 Oct 2023 15:42:31 -0700 Subject: [PATCH 3/9] cleanup test Signed-off-by: Julien Le Dem --- api/src/test/java/marquez/db/LineageDaoTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 48e0dd18cf..ce21ed70be 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -12,11 +12,14 @@ import static marquez.db.LineageTestUtils.newDatasetFacet; import static marquez.db.LineageTestUtils.writeDownstreamLineage; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import com.google.common.base.Functions; import java.sql.SQLException; import java.time.Instant; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; @@ -29,6 +32,7 @@ import java.util.stream.Stream; import marquez.api.JdbiUtils; import marquez.common.models.JobType; +import marquez.db.LineageDao.SimpleLineageEdge; import marquez.db.LineageTestUtils.DatasetConsumerJob; import marquez.db.LineageTestUtils.JobLineage; import marquez.db.models.JobRow; @@ -166,7 +170,11 @@ public void testGetLineage() { expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator); } - lineageDao.getDirectLineageFromParent("foo", "bar"); + Collection directLineageFromParent = + lineageDao.getDirectLineageFromParent( + disjointJob.getJob().getNamespaceName(), disjointJob.getJob().getName()); + assertNotNull(directLineageFromParent); + assertTrue(directLineageFromParent.toString(), directLineageFromParent.size() == 0); } @Test From 47cc1d5a16e170b1ad41049421cc97e9a5da3277 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Mon, 23 Oct 2023 15:53:17 -0700 Subject: [PATCH 4/9] cleanup test Signed-off-by: Julien Le Dem --- api/src/test/java/marquez/db/LineageDaoTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index ce21ed70be..616c320f91 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -321,6 +321,14 @@ public void testGetLineageWithJobThatHasNoDownstreamConsumers() { assertThat(lineage).hasSize(1).contains(writeJob.getJob().getUuid()); } + @Test + public void testGetDirectLineageFromParent() { + FacetTestUtils.createLineageWithFacets(openLineageDao); + Collection directLineageFromParent = + lineageDao.getDirectLineageFromParent("namespace", "name"); + assertTrue(directLineageFromParent.toString(), directLineageFromParent.size() == 2); + } + @Test public void testGetLineageWithJobThatHasNoDatasets() { From a38028bb227af37e560d35218d782022227fe7b1 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Wed, 25 Oct 2023 11:34:12 -0700 Subject: [PATCH 5/9] rename to direct lineage; use jobs_view in sql; replace simple_name with name Signed-off-by: Julien Le Dem --- .../java/marquez/api/OpenLineageResource.java | 2 +- api/src/main/java/marquez/db/LineageDao.java | 27 ++++++++----------- ...pper.java => DirectLineageEdgeMapper.java} | 8 +++--- .../java/marquez/service/LineageService.java | 10 +++---- .../test/java/marquez/db/LineageDaoTest.java | 14 +++++----- 5 files changed, 28 insertions(+), 33 deletions(-) rename api/src/main/java/marquez/db/mappers/{SimpleLineageEdgeMapper.java => DirectLineageEdgeMapper.java} (87%) diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 5f411d54a7..aaafad9496 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -114,7 +114,7 @@ public Response getLineage( @Consumes(APPLICATION_JSON) @Produces(APPLICATION_JSON) @Path("/lineage/direct") - public Response getSimpleLineage(@QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) { + public Response getDirectLineage(@QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) { if (!parentJobNodeId.isJobType()) { throw new IllegalArgumentException("Only job expected, got " + parentJobNodeId.getValue()); } diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 00382ba20b..3600db31c7 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -13,10 +13,10 @@ import marquez.common.models.DatasetId; import marquez.common.models.JobId; import marquez.db.mappers.DatasetDataMapper; +import marquez.db.mappers.DirectLineageEdgeMapper; import marquez.db.mappers.JobDataMapper; import marquez.db.mappers.JobRowMapper; import marquez.db.mappers.RunMapper; -import marquez.db.mappers.SimpleLineageEdgeMapper; import marquez.service.models.DatasetData; import marquez.service.models.JobData; import marquez.service.models.Run; @@ -28,12 +28,12 @@ @RegisterRowMapper(JobDataMapper.class) @RegisterRowMapper(RunMapper.class) @RegisterRowMapper(JobRowMapper.class) -@RegisterRowMapper(SimpleLineageEdgeMapper.class) +@RegisterRowMapper(DirectLineageEdgeMapper.class) public interface LineageDao { - public record SimpleLineage(Collection edges) {} + public record DirectLineage(Collection edges) {} - public record SimpleLineageEdge( + public record DirectLineageEdge( JobId job1, String direction, DatasetId dataset, @@ -103,27 +103,22 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids @SqlQuery( """ SELECT - jobs.namespace_name AS job_namespace, jobs.simple_name AS job_name, + jobs.namespace_name AS job_namespace, jobs."name" AS job_name, jvim.io_type AS io1, d.namespace_name AS ds_namespace, d."name" AS ds_name, jvim2.io_type AS io2, jv2.namespace_name AS job2_namespace, jv2.job_name AS job2_name, - pj.namespace_name AS job2_parent_namespace, pj.simple_name AS job2_parent_name - FROM jobs + jv2.namespace_name AS job2_parent_namespace, j2.parent_job_name AS job2_parent_name + FROM jobs_view jobs INNER JOIN job_versions jv ON jv.uuid = jobs.current_version_uuid LEFT JOIN job_versions_io_mapping jvim ON jvim.job_version_uuid = jobs.current_version_uuid LEFT JOIN datasets d ON d.uuid = jvim.dataset_uuid - LEFT JOIN job_versions_io_mapping jvim2 ON jvim2.dataset_uuid = d.uuid AND jvim2.job_version_uuid <> jvim.job_version_uuid and jvim2.io_type <> jvim.io_type + LEFT JOIN job_versions_io_mapping jvim2 ON jvim2.dataset_uuid = d.uuid AND jvim2.job_version_uuid <> jvim.job_version_uuid AND jvim2.io_type <> jvim.io_type LEFT JOIN job_versions jv2 ON jv2.uuid = jvim2.job_version_uuid - LEFT JOIN jobs j2 ON jv2.job_uuid = j2.uuid - LEFT JOIN jobs pj ON j2.parent_job_uuid = pj.uuid - WHERE jobs.parent_job_uuid IN ( - SELECT uuid AS parent_job_uuid - FROM jobs - WHERE namespace_name=:parentJobNamespace and simple_name=:parentJobName - ); + LEFT JOIN jobs_view j2 ON jv2.job_uuid = j2.uuid + WHERE jobs.namespace_name = :parentJobNamespace AND jobs.parent_job_name = :parentJobName ; """) - Collection getDirectLineageFromParent( + Collection getDirectLineageFromParent( String parentJobNamespace, String parentJobName); @SqlQuery( diff --git a/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java b/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java similarity index 87% rename from api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java rename to api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java index 00753f866c..2550c7e0ef 100644 --- a/api/src/main/java/marquez/db/mappers/SimpleLineageEdgeMapper.java +++ b/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java @@ -15,13 +15,13 @@ import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; -import marquez.db.LineageDao.SimpleLineageEdge; +import marquez.db.LineageDao.DirectLineageEdge; import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; -public final class SimpleLineageEdgeMapper implements RowMapper { +public final class DirectLineageEdgeMapper implements RowMapper { @Override - public SimpleLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context) + public DirectLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context) throws SQLException { JobId job1 = JobId.of( @@ -48,6 +48,6 @@ public SimpleLineageEdge map(@NonNull ResultSet results, @NonNull StatementConte : JobId.of( NamespaceName.of(job2parent_namespace), JobName.of(stringOrThrow(results, "job2_parent_name"))); - return new SimpleLineageEdge(job1, io1, ds, io2, job2, job2parent); + return new DirectLineageEdge(job1, io1, ds, io2, job2, job2parent); } } diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 2f5da474ed..30bb3bcea7 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -79,7 +79,7 @@ public LineageService(LineageDao delegate, JobDao jobDao) { public ParentLineage parentDirectLineage(JobId parentJobId) { log.debug("Attempting to get lineage for parent job '{}'", parentJobId); - Collection directLineageFromParent = + Collection directLineageFromParent = getDirectLineageFromParent( parentJobId.getNamespace().getValue(), parentJobId.getName().getValue()); @@ -87,19 +87,19 @@ public ParentLineage parentDirectLineage(JobId parentJobId) { directLineageFromParent.stream() .collect( groupingBy( - SimpleLineageEdge::job1, + DirectLineageEdge::job1, filtering( e -> e.direction() != null, groupingBy( - SimpleLineageEdge::direction, + DirectLineageEdge::direction, filtering( e -> e.dataset() != null, groupingBy( - SimpleLineageEdge::dataset, + DirectLineageEdge::dataset, filtering( e -> e.direction2() != null, groupingBy( - SimpleLineageEdge::direction2, + DirectLineageEdge::direction2, mapping( e -> new JobWithParent(e.job2(), e.job2parent()), toList()))))))))); diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 616c320f91..c0dad3ad71 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -32,7 +32,7 @@ import java.util.stream.Stream; import marquez.api.JdbiUtils; import marquez.common.models.JobType; -import marquez.db.LineageDao.SimpleLineageEdge; +import marquez.db.LineageDao.DirectLineageEdge; import marquez.db.LineageTestUtils.DatasetConsumerJob; import marquez.db.LineageTestUtils.JobLineage; import marquez.db.models.JobRow; @@ -170,11 +170,11 @@ public void testGetLineage() { expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator); } - Collection directLineageFromParent = + Collection FromParent = lineageDao.getDirectLineageFromParent( disjointJob.getJob().getNamespaceName(), disjointJob.getJob().getName()); - assertNotNull(directLineageFromParent); - assertTrue(directLineageFromParent.toString(), directLineageFromParent.size() == 0); + assertNotNull(FromParent); + assertTrue(FromParent.toString(), FromParent.size() == 0); } @Test @@ -322,11 +322,11 @@ public void testGetLineageWithJobThatHasNoDownstreamConsumers() { } @Test - public void testGetDirectLineageFromParent() { + public void testGetFromParent() { FacetTestUtils.createLineageWithFacets(openLineageDao); - Collection directLineageFromParent = + Collection FromParent = lineageDao.getDirectLineageFromParent("namespace", "name"); - assertTrue(directLineageFromParent.toString(), directLineageFromParent.size() == 2); + assertTrue(FromParent.toString(), FromParent.size() == 2); } @Test From c14c83631f2ee7d0b3846f1c6db4b5b5d5ed0aa7 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Wed, 1 Nov 2023 16:25:47 -0700 Subject: [PATCH 6/9] add LineageService test Signed-off-by: Julien Le Dem --- api/src/test/java/marquez/db/LineageTestUtils.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 84dc6d18f1..6793d89015 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -20,6 +20,8 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import javax.validation.Valid; + +import lombok.Builder; import lombok.Value; import marquez.common.Utils; import marquez.db.models.UpdateLineageRow; @@ -281,4 +283,16 @@ public static class DatasetConsumerJob { int numConsumers; Optional outputDatasetName; } + + @Value + @Builder + public static class CreateJobLineage { + String jobName; + String status; + JobFacet jobFacet; + List inputs; + List outputs; + @Valid LineageEvent.ParentRunFacet parentRunFacet; + ImmutableMap runFacets; + } } From 95d43e4ee1cd295867f82eb77942b55957c31ba7 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Wed, 1 Nov 2023 16:26:29 -0700 Subject: [PATCH 7/9] add LineageService test Signed-off-by: Julien Le Dem --- .../java/marquez/db/LineageTestUtils.java | 30 ++++++++- .../marquez/service/LineageServiceTest.java | 62 ++++++++++++++++++- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 6793d89015..838b1d9619 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -208,6 +208,28 @@ public static List writeDownstreamLineage( List downstream, JobFacet jobFacet, Dataset dataset) { + return writeDownstreamLineageWithParent(openLineageDao, downstream, jobFacet, dataset, null); + } + + /** + * Recursive function which supports writing a lineage graph by supplying an input dataset and a + * list of {@link DatasetConsumerJob}s. Each consumer may output up to one dataset, which will be + * consumed by the number of consumers specified by the {@link DatasetConsumerJob#numConsumers} + * property. + * + * @param openLineageDao + * @param downstream + * @param jobFacet + * @param dataset + * @param parentRunFacet + * @return + */ + public static List writeDownstreamLineageWithParent( + OpenLineageDao openLineageDao, + List downstream, + JobFacet jobFacet, + Dataset dataset, + @Valid LineageEvent.ParentRunFacet parentRunFacet) { DatasetConsumerJob consumer = downstream.get(0); return IntStream.range(0, consumer.getNumConsumers()) .mapToObj( @@ -232,17 +254,19 @@ public static List writeDownstreamLineage( "COMPLETE", jobFacet, Collections.singletonList(dataset), - outputs.stream().collect(Collectors.toList())); + outputs.stream().collect(Collectors.toList()), + parentRunFacet); List downstreamLineage = outputs.stream() .flatMap( out -> { if (consumer.numConsumers > 0) { - return writeDownstreamLineage( + return writeDownstreamLineageWithParent( openLineageDao, downstream.subList(1, downstream.size()), jobFacet, - out) + out, + parentRunFacet) .stream(); } else { return Stream.empty(); diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 7f6828dfa0..da1be8259c 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -5,15 +5,19 @@ package marquez.service; -import static marquez.db.LineageTestUtils.NAMESPACE; +import static marquez.db.LineageTestUtils.*; import static marquez.db.LineageTestUtils.newDatasetFacet; import static marquez.db.LineageTestUtils.writeDownstreamLineage; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.UUID; + import marquez.api.JdbiUtils; import marquez.common.models.DatasetName; import marquez.common.models.InputDatasetVersion; @@ -30,11 +34,15 @@ import marquez.db.OpenLineageDao; import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.LineageService.ParentLineage; import marquez.service.models.Edge; import marquez.service.models.JobData; import marquez.service.models.Lineage; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.JobFacet; +import marquez.service.models.LineageEvent.JobLink; +import marquez.service.models.LineageEvent.ParentRunFacet; +import marquez.service.models.LineageEvent.RunLink; import marquez.service.models.LineageEvent.SchemaField; import marquez.service.models.Node; import marquez.service.models.NodeId; @@ -45,6 +53,7 @@ import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.api.ObjectAssert; import org.jdbi.v3.core.Jdbi; +import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -433,6 +442,57 @@ public void testLineageForOrphanedDataset() { .containsExactlyInAnyOrder(datasetNodeId); } + @Test + public void testParentLineage() { + String parentJobName1 = "parentJob1"; + String parentJobName2 = "parentJob2"; + ParentRunFacet parentRunFacet1 = new ParentRunFacet(PRODUCER_URL, SCHEMA_URL, new RunLink(UUID.randomUUID().toString()), JobLink.builder().namespace(NAMESPACE).name(parentJobName1).build()); + ParentRunFacet parentRunFacet2 = new ParentRunFacet(PRODUCER_URL, SCHEMA_URL, new RunLink(UUID.randomUUID().toString()), JobLink.builder().namespace(NAMESPACE).name(parentJobName2).build()); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset), + parentRunFacet1); + List jobRows = + LineageTestUtils.writeDownstreamLineageWithParent( + openLineageDao, + new LinkedList<>( + Arrays.asList( + new DatasetConsumerJob("readJob", 2, Optional.of("outputData")), + new DatasetConsumerJob("downstreamJob", 1, Optional.of("outputData2")), + new DatasetConsumerJob("finalConsumer", 1, Optional.empty()))), + jobFacet, + dataset, + parentRunFacet2); + + ParentLineage parentLineage = + lineageService.parentDirectLineage(JobId.of(new NamespaceName(NAMESPACE), new JobName(parentJobName1))); + assertEquals(NAMESPACE, parentLineage.parent().getNamespace().getValue()); + assertEquals(parentJobName1, parentLineage.parent().getName().getValue()); + assertEquals(1, parentLineage.children().size()); + parentLineage.children().forEach( + c -> { + assertEquals("parentJob1.writeJob", c.job().getName().getValue()); + assertNull(c.inputs()); + c.outputs().forEach( + i -> { + assertEquals(dataset.getName(), i.dataset().getName().getValue()); + i.consumers().forEach( co -> { + assertThat(co.job().getName().getValue()).matches("parentJob2.readJob.*<-commonDataset"); + assertThat(co.parent().getName().getValue()).isEqualTo("parentJob2"); + // we don't go further than one level and don't see downstreamJob and finalConsumer + }); + assertNull(i.producers()); + } + ); + } + ); + } + private boolean jobNameEquals(Node node, String writeJob) { return node.getId().asJobId().getName().getValue().equals(writeJob); } From 53ff595d8f557ed401db8805f0812e6abd2f5c00 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Wed, 1 Nov 2023 17:01:27 -0700 Subject: [PATCH 8/9] fixing integration tests Signed-off-by: Julien Le Dem --- .../marquez/OpenLineageIntegrationTest.java | 58 +++++------------ .../java/marquez/db/LineageTestUtils.java | 1 - .../marquez/service/LineageServiceTest.java | 63 ++++++++++++------- .../java/marquez/client/MarquezClient.java | 24 +++++++ .../java/marquez/client/MarquezPathV1.java | 4 ++ .../main/java/marquez/client/MarquezUrl.java | 25 ++++++-- 6 files changed, 101 insertions(+), 74 deletions(-) diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 25f4fa102b..e0793290c6 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -5,12 +5,12 @@ package marquez; +import static java.util.Arrays.asList; import static marquez.db.LineageTestUtils.PRODUCER_URL; import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -27,9 +27,7 @@ import io.openlineage.client.OpenLineage.RunFacetsBuilder; import java.io.IOException; import java.net.URI; -import java.net.http.HttpRequest; import java.net.http.HttpResponse; -import java.net.http.HttpResponse.BodyHandlers; import java.nio.charset.Charset; import java.time.Instant; import java.time.ZoneId; @@ -50,6 +48,7 @@ import lombok.extern.slf4j.Slf4j; import marquez.api.JdbiUtils; import marquez.client.MarquezClient; +import marquez.client.MarquezClient.ParentLineage; import marquez.client.models.Dataset; import marquez.client.models.DatasetVersion; import marquez.client.models.Job; @@ -57,12 +56,9 @@ import marquez.client.models.LineageEvent; import marquez.client.models.Run; import marquez.common.Utils; -import marquez.common.models.JobName; -import marquez.common.models.NamespaceName; import marquez.db.LineageTestUtils; import marquez.service.models.DatasetEvent; import marquez.service.models.JobEvent; -import marquez.service.models.NodeId; import org.assertj.core.api.InstanceOfAssertFactories; import org.jdbi.v3.core.Jdbi; import org.jetbrains.annotations.NotNull; @@ -323,26 +319,13 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet() List runsList = client.listRuns(NAMESPACE_NAME, dagName); assertThat(runsList).isNotEmpty().hasSize(1); - marquez.common.models.JobId jobId = - new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName)); - String nodeId = NodeId.of(jobId).getValue(); - HttpRequest request = - HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId)) - .header("Content-Type", "application/json") - .GET() - .build(); - - HttpResponse resp; - try { - resp = http2.send(request, BodyHandlers.ofString()); + ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName)); + assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME); + assertThat(directLineage.parent().getName()).isEqualTo(dagName); + assertThat(directLineage.children()).size().isEqualTo(2); - assertEquals(200, resp.statusCode(), resp.body()); - assertTrue(resp.body().contains("task1"), resp.body()); - assertTrue(resp.body().contains("task2"), resp.body()); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } + assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList()) + .isEqualTo(asList("the_dag.task1", "the_dag.task2")); } @Test @@ -418,26 +401,13 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF List runsList = client.listRuns(NAMESPACE_NAME, dagName); assertThat(runsList).isNotEmpty().hasSize(1); - marquez.common.models.JobId jobId = - new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName)); - String nodeId = NodeId.of(jobId).getValue(); - HttpRequest request = - HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId)) - .header("Content-Type", "application/json") - .GET() - .build(); - - HttpResponse resp; - try { - resp = http2.send(request, BodyHandlers.ofString()); + ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName)); + assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME); + assertThat(directLineage.parent().getName()).isEqualTo(dagName); + assertThat(directLineage.children()).size().isEqualTo(2); - assertEquals(200, resp.statusCode(), resp.body()); - assertTrue(resp.body().contains("task1"), resp.body()); - assertTrue(resp.body().contains("task2"), resp.body()); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } + assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList()) + .isEqualTo(asList("the_dag.task1", "the_dag.task2")); } @Test diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 838b1d9619..94a6b25d98 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -20,7 +20,6 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import javax.validation.Valid; - import lombok.Builder; import lombok.Value; import marquez.common.Utils; diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index da1be8259c..61be6eb605 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -5,7 +5,9 @@ package marquez.service; -import static marquez.db.LineageTestUtils.*; +import static marquez.db.LineageTestUtils.NAMESPACE; +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; import static marquez.db.LineageTestUtils.newDatasetFacet; import static marquez.db.LineageTestUtils.writeDownstreamLineage; import static org.assertj.core.api.Assertions.assertThat; @@ -17,7 +19,6 @@ import java.util.List; import java.util.Optional; import java.util.UUID; - import marquez.api.JdbiUtils; import marquez.common.models.DatasetName; import marquez.common.models.InputDatasetVersion; @@ -53,7 +54,6 @@ import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.api.ObjectAssert; import org.jdbi.v3.core.Jdbi; -import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -446,8 +446,18 @@ public void testLineageForOrphanedDataset() { public void testParentLineage() { String parentJobName1 = "parentJob1"; String parentJobName2 = "parentJob2"; - ParentRunFacet parentRunFacet1 = new ParentRunFacet(PRODUCER_URL, SCHEMA_URL, new RunLink(UUID.randomUUID().toString()), JobLink.builder().namespace(NAMESPACE).name(parentJobName1).build()); - ParentRunFacet parentRunFacet2 = new ParentRunFacet(PRODUCER_URL, SCHEMA_URL, new RunLink(UUID.randomUUID().toString()), JobLink.builder().namespace(NAMESPACE).name(parentJobName2).build()); + ParentRunFacet parentRunFacet1 = + new ParentRunFacet( + PRODUCER_URL, + SCHEMA_URL, + new RunLink(UUID.randomUUID().toString()), + JobLink.builder().namespace(NAMESPACE).name(parentJobName1).build()); + ParentRunFacet parentRunFacet2 = + new ParentRunFacet( + PRODUCER_URL, + SCHEMA_URL, + new RunLink(UUID.randomUUID().toString()), + JobLink.builder().namespace(NAMESPACE).name(parentJobName2).build()); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( openLineageDao, @@ -470,27 +480,34 @@ public void testParentLineage() { parentRunFacet2); ParentLineage parentLineage = - lineageService.parentDirectLineage(JobId.of(new NamespaceName(NAMESPACE), new JobName(parentJobName1))); + lineageService.parentDirectLineage( + JobId.of(new NamespaceName(NAMESPACE), new JobName(parentJobName1))); assertEquals(NAMESPACE, parentLineage.parent().getNamespace().getValue()); assertEquals(parentJobName1, parentLineage.parent().getName().getValue()); assertEquals(1, parentLineage.children().size()); - parentLineage.children().forEach( - c -> { - assertEquals("parentJob1.writeJob", c.job().getName().getValue()); - assertNull(c.inputs()); - c.outputs().forEach( - i -> { - assertEquals(dataset.getName(), i.dataset().getName().getValue()); - i.consumers().forEach( co -> { - assertThat(co.job().getName().getValue()).matches("parentJob2.readJob.*<-commonDataset"); - assertThat(co.parent().getName().getValue()).isEqualTo("parentJob2"); - // we don't go further than one level and don't see downstreamJob and finalConsumer - }); - assertNull(i.producers()); - } - ); - } - ); + parentLineage + .children() + .forEach( + c -> { + assertEquals("parentJob1.writeJob", c.job().getName().getValue()); + assertNull(c.inputs()); + c.outputs() + .forEach( + i -> { + assertEquals(dataset.getName(), i.dataset().getName().getValue()); + i.consumers() + .forEach( + co -> { + assertThat(co.job().getName().getValue()) + .matches("parentJob2.readJob.*<-commonDataset"); + assertThat(co.parent().getName().getValue()) + .isEqualTo("parentJob2"); + // we don't go further than one level and don't see downstreamJob + // and finalConsumer + }); + assertNull(i.producers()); + }); + }); } private boolean jobNameEquals(Node node, String writeJob) { diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index 3d55eac28f..4155fb75b1 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -21,6 +21,7 @@ import java.net.URL; import java.time.Instant; import java.time.ZonedDateTime; +import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.Set; @@ -35,9 +36,11 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; import marquez.client.models.Dataset; +import marquez.client.models.DatasetId; import marquez.client.models.DatasetMeta; import marquez.client.models.DatasetVersion; import marquez.client.models.Job; +import marquez.client.models.JobId; import marquez.client.models.JobMeta; import marquez.client.models.JobVersion; import marquez.client.models.LineageEvent; @@ -125,6 +128,11 @@ public Lineage getLineage(NodeId nodeId, int depth) { return Lineage.fromJson(bodyAsJson); } + public ParentLineage getDirectLineage(JobId parentJobId) { + final String bodyAsJson = http.get(url.toDirectLineageUrl(parentJobId)); + return ParentLineage.fromJson(bodyAsJson); + } + public Lineage getColumnLineage(NodeId nodeId) { return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false); } @@ -703,4 +711,20 @@ String toJson() { return Utils.toJson(this); } } + + public record JobWithParent(JobId job, JobId parent) {} + + public record DatasetLineage( + DatasetId dataset, + Collection consumers, + Collection producers) {} + + public record ChildLineage( + JobId job, Collection inputs, Collection outputs) {} + + public record ParentLineage(JobId parent, Collection children) { + static ParentLineage fromJson(final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } + } } diff --git a/clients/java/src/main/java/marquez/client/MarquezPathV1.java b/clients/java/src/main/java/marquez/client/MarquezPathV1.java index 2ceba79f9e..56fcdc0af2 100644 --- a/clients/java/src/main/java/marquez/client/MarquezPathV1.java +++ b/clients/java/src/main/java/marquez/client/MarquezPathV1.java @@ -182,6 +182,10 @@ static String lineagePath() { return path("/lineage/"); } + static String directLineagePath() { + return path("/lineage/direct"); + } + static String columnLineagePath() { return path("/column-lineage/"); } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index cc460c4a4f..73fed0a83b 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -13,6 +13,7 @@ import static marquez.client.MarquezPathV1.datasetPath; import static marquez.client.MarquezPathV1.datasetTagPath; import static marquez.client.MarquezPathV1.datasetVersionPath; +import static marquez.client.MarquezPathV1.directLineagePath; import static marquez.client.MarquezPathV1.fieldTagPath; import static marquez.client.MarquezPathV1.jobPath; import static marquez.client.MarquezPathV1.jobVersionPath; @@ -31,8 +32,6 @@ import static marquez.client.MarquezPathV1.searchPath; import static marquez.client.MarquezPathV1.sourcePath; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; @@ -41,13 +40,20 @@ import java.time.ZonedDateTime; import java.util.HashMap; import java.util.Map; + import javax.annotation.Nullable; + +import org.apache.http.client.utils.URIBuilder; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + import lombok.NonNull; +import marquez.client.models.JobId; import marquez.client.models.NodeId; import marquez.client.models.RunState; import marquez.client.models.SearchFilter; import marquez.client.models.SearchSort; -import org.apache.http.client.utils.URIBuilder; class MarquezUrl { @@ -197,7 +203,7 @@ URL toCreateTagsUrl(String name) { URL toSearchUrl( @NonNull String query, @Nullable SearchFilter filter, @Nullable SearchSort sort, int limit) { - final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder<>(); queryParams.put("q", query); if (filter != null) { queryParams.put("filter", filter); @@ -210,17 +216,24 @@ URL toSearchUrl( } URL toLineageUrl(NodeId nodeId, int depth) { - final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder<>(); queryParams.put("nodeId", nodeId.getValue()); queryParams.put("depth", String.valueOf(depth)); return from(lineagePath(), queryParams.build()); } URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) { - final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder<>(); queryParams.put("nodeId", nodeId.getValue()); queryParams.put("depth", String.valueOf(depth)); queryParams.put("withDownstream", String.valueOf(withDownstream)); return from(columnLineagePath(), queryParams.build()); } + + public URL toDirectLineageUrl(@NonNull JobId parentJobId) { + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder<>(); + queryParams.put("parentJobNodeId", NodeId.of(parentJobId).getValue()); + return from(directLineagePath(), queryParams.build()); + } + } From e7af696b67a89631e65866660319bf7b441d6632 Mon Sep 17 00:00:00 2001 From: Julien Le Dem Date: Wed, 1 Nov 2023 17:08:31 -0700 Subject: [PATCH 9/9] add javadoc to mapper Signed-off-by: Julien Le Dem --- .../main/java/marquez/db/mappers/DirectLineageEdgeMapper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java b/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java index 2550c7e0ef..1ffe70844f 100644 --- a/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java +++ b/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java @@ -19,6 +19,7 @@ import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; +/** Maps the result set of direct lineage to a DirectLineageEdge */ public final class DirectLineageEdgeMapper implements RowMapper { @Override public DirectLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context)