Skip to content

Commit 53ff595

Browse files
committed
fixing integration tests
Signed-off-by: Julien Le Dem <[email protected]>
1 parent 95d43e4 commit 53ff595

File tree

6 files changed

+101
-74
lines changed

6 files changed

+101
-74
lines changed

api/src/test/java/marquez/OpenLineageIntegrationTest.java

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55

66
package marquez;
77

8+
import static java.util.Arrays.asList;
89
import static marquez.db.LineageTestUtils.PRODUCER_URL;
910
import static marquez.db.LineageTestUtils.SCHEMA_URL;
1011
import static org.assertj.core.api.Assertions.as;
1112
import static org.assertj.core.api.Assertions.assertThat;
1213
import static org.junit.jupiter.api.Assertions.assertEquals;
13-
import static org.junit.jupiter.api.Assertions.assertTrue;
1414

1515
import com.fasterxml.jackson.core.JsonProcessingException;
1616
import com.fasterxml.jackson.core.type.TypeReference;
@@ -27,9 +27,7 @@
2727
import io.openlineage.client.OpenLineage.RunFacetsBuilder;
2828
import java.io.IOException;
2929
import java.net.URI;
30-
import java.net.http.HttpRequest;
3130
import java.net.http.HttpResponse;
32-
import java.net.http.HttpResponse.BodyHandlers;
3331
import java.nio.charset.Charset;
3432
import java.time.Instant;
3533
import java.time.ZoneId;
@@ -50,19 +48,17 @@
5048
import lombok.extern.slf4j.Slf4j;
5149
import marquez.api.JdbiUtils;
5250
import marquez.client.MarquezClient;
51+
import marquez.client.MarquezClient.ParentLineage;
5352
import marquez.client.models.Dataset;
5453
import marquez.client.models.DatasetVersion;
5554
import marquez.client.models.Job;
5655
import marquez.client.models.JobId;
5756
import marquez.client.models.LineageEvent;
5857
import marquez.client.models.Run;
5958
import marquez.common.Utils;
60-
import marquez.common.models.JobName;
61-
import marquez.common.models.NamespaceName;
6259
import marquez.db.LineageTestUtils;
6360
import marquez.service.models.DatasetEvent;
6461
import marquez.service.models.JobEvent;
65-
import marquez.service.models.NodeId;
6662
import org.assertj.core.api.InstanceOfAssertFactories;
6763
import org.jdbi.v3.core.Jdbi;
6864
import org.jetbrains.annotations.NotNull;
@@ -323,26 +319,13 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet()
323319
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
324320
assertThat(runsList).isNotEmpty().hasSize(1);
325321

326-
marquez.common.models.JobId jobId =
327-
new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName));
328-
String nodeId = NodeId.of(jobId).getValue();
329-
HttpRequest request =
330-
HttpRequest.newBuilder()
331-
.uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId))
332-
.header("Content-Type", "application/json")
333-
.GET()
334-
.build();
335-
336-
HttpResponse<String> resp;
337-
try {
338-
resp = http2.send(request, BodyHandlers.ofString());
322+
ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName));
323+
assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME);
324+
assertThat(directLineage.parent().getName()).isEqualTo(dagName);
325+
assertThat(directLineage.children()).size().isEqualTo(2);
339326

340-
assertEquals(200, resp.statusCode(), resp.body());
341-
assertTrue(resp.body().contains("task1"), resp.body());
342-
assertTrue(resp.body().contains("task2"), resp.body());
343-
} catch (IOException | InterruptedException e) {
344-
throw new RuntimeException(e);
345-
}
327+
assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList())
328+
.isEqualTo(asList("the_dag.task1", "the_dag.task2"));
346329
}
347330

348331
@Test
@@ -418,26 +401,13 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF
418401
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
419402
assertThat(runsList).isNotEmpty().hasSize(1);
420403

421-
marquez.common.models.JobId jobId =
422-
new marquez.common.models.JobId(NamespaceName.of(NAMESPACE_NAME), JobName.of(dagName));
423-
String nodeId = NodeId.of(jobId).getValue();
424-
HttpRequest request =
425-
HttpRequest.newBuilder()
426-
.uri(URI.create(baseUrl + "/api/v1/lineage/direct?parentJobNodeId=" + nodeId))
427-
.header("Content-Type", "application/json")
428-
.GET()
429-
.build();
430-
431-
HttpResponse<String> resp;
432-
try {
433-
resp = http2.send(request, BodyHandlers.ofString());
404+
ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName));
405+
assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME);
406+
assertThat(directLineage.parent().getName()).isEqualTo(dagName);
407+
assertThat(directLineage.children()).size().isEqualTo(2);
434408

435-
assertEquals(200, resp.statusCode(), resp.body());
436-
assertTrue(resp.body().contains("task1"), resp.body());
437-
assertTrue(resp.body().contains("task2"), resp.body());
438-
} catch (IOException | InterruptedException e) {
439-
throw new RuntimeException(e);
440-
}
409+
assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList())
410+
.isEqualTo(asList("the_dag.task1", "the_dag.task2"));
441411
}
442412

443413
@Test

api/src/test/java/marquez/db/LineageTestUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.stream.IntStream;
2121
import java.util.stream.Stream;
2222
import javax.validation.Valid;
23-
2423
import lombok.Builder;
2524
import lombok.Value;
2625
import marquez.common.Utils;

api/src/test/java/marquez/service/LineageServiceTest.java

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package marquez.service;
77

8-
import static marquez.db.LineageTestUtils.*;
8+
import static marquez.db.LineageTestUtils.NAMESPACE;
9+
import static marquez.db.LineageTestUtils.PRODUCER_URL;
10+
import static marquez.db.LineageTestUtils.SCHEMA_URL;
911
import static marquez.db.LineageTestUtils.newDatasetFacet;
1012
import static marquez.db.LineageTestUtils.writeDownstreamLineage;
1113
import static org.assertj.core.api.Assertions.assertThat;
@@ -17,7 +19,6 @@
1719
import java.util.List;
1820
import java.util.Optional;
1921
import java.util.UUID;
20-
2122
import marquez.api.JdbiUtils;
2223
import marquez.common.models.DatasetName;
2324
import marquez.common.models.InputDatasetVersion;
@@ -53,7 +54,6 @@
5354
import org.assertj.core.api.InstanceOfAssertFactories;
5455
import org.assertj.core.api.ObjectAssert;
5556
import org.jdbi.v3.core.Jdbi;
56-
import org.junit.Assert;
5757
import org.junit.jupiter.api.AfterEach;
5858
import org.junit.jupiter.api.BeforeAll;
5959
import org.junit.jupiter.api.Test;
@@ -446,8 +446,18 @@ public void testLineageForOrphanedDataset() {
446446
public void testParentLineage() {
447447
String parentJobName1 = "parentJob1";
448448
String parentJobName2 = "parentJob2";
449-
ParentRunFacet parentRunFacet1 = new ParentRunFacet(PRODUCER_URL, SCHEMA_URL, new RunLink(UUID.randomUUID().toString()), JobLink.builder().namespace(NAMESPACE).name(parentJobName1).build());
450-
ParentRunFacet parentRunFacet2 = new ParentRunFacet(PRODUCER_URL, SCHEMA_URL, new RunLink(UUID.randomUUID().toString()), JobLink.builder().namespace(NAMESPACE).name(parentJobName2).build());
449+
ParentRunFacet parentRunFacet1 =
450+
new ParentRunFacet(
451+
PRODUCER_URL,
452+
SCHEMA_URL,
453+
new RunLink(UUID.randomUUID().toString()),
454+
JobLink.builder().namespace(NAMESPACE).name(parentJobName1).build());
455+
ParentRunFacet parentRunFacet2 =
456+
new ParentRunFacet(
457+
PRODUCER_URL,
458+
SCHEMA_URL,
459+
new RunLink(UUID.randomUUID().toString()),
460+
JobLink.builder().namespace(NAMESPACE).name(parentJobName2).build());
451461
UpdateLineageRow writeJob =
452462
LineageTestUtils.createLineageRow(
453463
openLineageDao,
@@ -470,27 +480,34 @@ public void testParentLineage() {
470480
parentRunFacet2);
471481

472482
ParentLineage parentLineage =
473-
lineageService.parentDirectLineage(JobId.of(new NamespaceName(NAMESPACE), new JobName(parentJobName1)));
483+
lineageService.parentDirectLineage(
484+
JobId.of(new NamespaceName(NAMESPACE), new JobName(parentJobName1)));
474485
assertEquals(NAMESPACE, parentLineage.parent().getNamespace().getValue());
475486
assertEquals(parentJobName1, parentLineage.parent().getName().getValue());
476487
assertEquals(1, parentLineage.children().size());
477-
parentLineage.children().forEach(
478-
c -> {
479-
assertEquals("parentJob1.writeJob", c.job().getName().getValue());
480-
assertNull(c.inputs());
481-
c.outputs().forEach(
482-
i -> {
483-
assertEquals(dataset.getName(), i.dataset().getName().getValue());
484-
i.consumers().forEach( co -> {
485-
assertThat(co.job().getName().getValue()).matches("parentJob2.readJob.*<-commonDataset");
486-
assertThat(co.parent().getName().getValue()).isEqualTo("parentJob2");
487-
// we don't go further than one level and don't see downstreamJob and finalConsumer
488-
});
489-
assertNull(i.producers());
490-
}
491-
);
492-
}
493-
);
488+
parentLineage
489+
.children()
490+
.forEach(
491+
c -> {
492+
assertEquals("parentJob1.writeJob", c.job().getName().getValue());
493+
assertNull(c.inputs());
494+
c.outputs()
495+
.forEach(
496+
i -> {
497+
assertEquals(dataset.getName(), i.dataset().getName().getValue());
498+
i.consumers()
499+
.forEach(
500+
co -> {
501+
assertThat(co.job().getName().getValue())
502+
.matches("parentJob2.readJob.*<-commonDataset");
503+
assertThat(co.parent().getName().getValue())
504+
.isEqualTo("parentJob2");
505+
// we don't go further than one level and don't see downstreamJob
506+
// and finalConsumer
507+
});
508+
assertNull(i.producers());
509+
});
510+
});
494511
}
495512

496513
private boolean jobNameEquals(Node node, String writeJob) {

clients/java/src/main/java/marquez/client/MarquezClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.net.URL;
2222
import java.time.Instant;
2323
import java.time.ZonedDateTime;
24+
import java.util.Collection;
2425
import java.util.List;
2526
import java.util.Properties;
2627
import java.util.Set;
@@ -35,9 +36,11 @@
3536
import lombok.Value;
3637
import lombok.extern.slf4j.Slf4j;
3738
import marquez.client.models.Dataset;
39+
import marquez.client.models.DatasetId;
3840
import marquez.client.models.DatasetMeta;
3941
import marquez.client.models.DatasetVersion;
4042
import marquez.client.models.Job;
43+
import marquez.client.models.JobId;
4144
import marquez.client.models.JobMeta;
4245
import marquez.client.models.JobVersion;
4346
import marquez.client.models.LineageEvent;
@@ -125,6 +128,11 @@ public Lineage getLineage(NodeId nodeId, int depth) {
125128
return Lineage.fromJson(bodyAsJson);
126129
}
127130

131+
public ParentLineage getDirectLineage(JobId parentJobId) {
132+
final String bodyAsJson = http.get(url.toDirectLineageUrl(parentJobId));
133+
return ParentLineage.fromJson(bodyAsJson);
134+
}
135+
128136
public Lineage getColumnLineage(NodeId nodeId) {
129137
return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
130138
}
@@ -703,4 +711,20 @@ String toJson() {
703711
return Utils.toJson(this);
704712
}
705713
}
714+
715+
public record JobWithParent(JobId job, JobId parent) {}
716+
717+
public record DatasetLineage(
718+
DatasetId dataset,
719+
Collection<JobWithParent> consumers,
720+
Collection<JobWithParent> producers) {}
721+
722+
public record ChildLineage(
723+
JobId job, Collection<DatasetLineage> inputs, Collection<DatasetLineage> outputs) {}
724+
725+
public record ParentLineage(JobId parent, Collection<ChildLineage> children) {
726+
static ParentLineage fromJson(final String json) {
727+
return Utils.fromJson(json, new TypeReference<ParentLineage>() {});
728+
}
729+
}
706730
}

clients/java/src/main/java/marquez/client/MarquezPathV1.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ static String lineagePath() {
182182
return path("/lineage/");
183183
}
184184

185+
static String directLineagePath() {
186+
return path("/lineage/direct");
187+
}
188+
185189
static String columnLineagePath() {
186190
return path("/column-lineage/");
187191
}

clients/java/src/main/java/marquez/client/MarquezUrl.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import static marquez.client.MarquezPathV1.datasetPath;
1414
import static marquez.client.MarquezPathV1.datasetTagPath;
1515
import static marquez.client.MarquezPathV1.datasetVersionPath;
16+
import static marquez.client.MarquezPathV1.directLineagePath;
1617
import static marquez.client.MarquezPathV1.fieldTagPath;
1718
import static marquez.client.MarquezPathV1.jobPath;
1819
import static marquez.client.MarquezPathV1.jobVersionPath;
@@ -31,8 +32,6 @@
3132
import static marquez.client.MarquezPathV1.searchPath;
3233
import static marquez.client.MarquezPathV1.sourcePath;
3334

34-
import com.google.common.annotations.VisibleForTesting;
35-
import com.google.common.collect.ImmutableMap;
3635
import java.net.MalformedURLException;
3736
import java.net.URI;
3837
import java.net.URISyntaxException;
@@ -41,13 +40,20 @@
4140
import java.time.ZonedDateTime;
4241
import java.util.HashMap;
4342
import java.util.Map;
43+
4444
import javax.annotation.Nullable;
45+
46+
import org.apache.http.client.utils.URIBuilder;
47+
48+
import com.google.common.annotations.VisibleForTesting;
49+
import com.google.common.collect.ImmutableMap;
50+
4551
import lombok.NonNull;
52+
import marquez.client.models.JobId;
4653
import marquez.client.models.NodeId;
4754
import marquez.client.models.RunState;
4855
import marquez.client.models.SearchFilter;
4956
import marquez.client.models.SearchSort;
50-
import org.apache.http.client.utils.URIBuilder;
5157

5258
class MarquezUrl {
5359

@@ -197,7 +203,7 @@ URL toCreateTagsUrl(String name) {
197203

198204
URL toSearchUrl(
199205
@NonNull String query, @Nullable SearchFilter filter, @Nullable SearchSort sort, int limit) {
200-
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
206+
final ImmutableMap.Builder<String, Object> queryParams = new ImmutableMap.Builder<>();
201207
queryParams.put("q", query);
202208
if (filter != null) {
203209
queryParams.put("filter", filter);
@@ -210,17 +216,24 @@ URL toSearchUrl(
210216
}
211217

212218
URL toLineageUrl(NodeId nodeId, int depth) {
213-
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
219+
final ImmutableMap.Builder<String, Object> queryParams = new ImmutableMap.Builder<>();
214220
queryParams.put("nodeId", nodeId.getValue());
215221
queryParams.put("depth", String.valueOf(depth));
216222
return from(lineagePath(), queryParams.build());
217223
}
218224

219225
URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) {
220-
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
226+
final ImmutableMap.Builder<String, Object> queryParams = new ImmutableMap.Builder<>();
221227
queryParams.put("nodeId", nodeId.getValue());
222228
queryParams.put("depth", String.valueOf(depth));
223229
queryParams.put("withDownstream", String.valueOf(withDownstream));
224230
return from(columnLineagePath(), queryParams.build());
225231
}
232+
233+
public URL toDirectLineageUrl(@NonNull JobId parentJobId) {
234+
final ImmutableMap.Builder<String, Object> queryParams = new ImmutableMap.Builder<>();
235+
queryParams.put("parentJobNodeId", NodeId.of(parentJobId).getValue());
236+
return from(directLineagePath(), queryParams.build());
237+
}
238+
226239
}

0 commit comments

Comments
 (0)