Skip to content

Commit 39689a1

Browse files
authored
Merge branch 'datahub-project:master' into master
2 parents 23fa938 + 0e068e2 commit 39689a1

File tree

62 files changed

+2956
-1003
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2956
-1003
lines changed

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java

+3
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateTestConnectionRequestResolver;
111111
import com.linkedin.datahub.graphql.resolvers.ingest.execution.GetIngestionExecutionRequestResolver;
112112
import com.linkedin.datahub.graphql.resolvers.ingest.execution.IngestionSourceExecutionRequestsResolver;
113+
import com.linkedin.datahub.graphql.resolvers.ingest.execution.ListExecutionRequestsResolver;
113114
import com.linkedin.datahub.graphql.resolvers.ingest.execution.RollbackIngestionResolver;
114115
import com.linkedin.datahub.graphql.resolvers.ingest.secret.CreateSecretResolver;
115116
import com.linkedin.datahub.graphql.resolvers.ingest.secret.DeleteSecretResolver;
@@ -999,6 +1000,8 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
9991000
.dataFetcher(
10001001
"listIngestionSources", new ListIngestionSourcesResolver(this.entityClient))
10011002
.dataFetcher("ingestionSource", new GetIngestionSourceResolver(this.entityClient))
1003+
.dataFetcher(
1004+
"listExecutionRequests", new ListExecutionRequestsResolver(this.entityClient))
10021005
.dataFetcher(
10031006
"executionRequest", new GetIngestionExecutionRequestResolver(this.entityClient))
10041007
.dataFetcher("getSchemaBlame", new GetSchemaBlameResolver(this.timelineService))

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/IngestionResolverUtils.java

+6
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public static ExecutionRequest mapExecutionRequest(
6868
if (executionRequestInput.getActorUrn() != null) {
6969
inputResult.setActorUrn(executionRequestInput.getActorUrn().toString());
7070
}
71+
if (executionRequestInput.hasExecutorId()) {
72+
inputResult.setExecutorId(executionRequestInput.getExecutorId());
73+
}
7174
result.setInput(inputResult);
7275
}
7376

@@ -88,6 +91,9 @@ public static ExecutionRequest mapExecutionRequest(
8891
final com.linkedin.datahub.graphql.generated.ExecutionRequestSource result =
8992
new com.linkedin.datahub.graphql.generated.ExecutionRequestSource();
9093
result.setType(execRequestSource.getType());
94+
if (execRequestSource.hasIngestionSource()) {
95+
result.setIngestionSource(execRequestSource.getIngestionSource().toString());
96+
}
9197
return result;
9298
}
9399

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package com.linkedin.datahub.graphql.resolvers.ingest.execution;
2+
3+
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
4+
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.buildFilter;
5+
6+
import com.google.common.collect.ImmutableSet;
7+
import com.linkedin.common.urn.Urn;
8+
import com.linkedin.datahub.graphql.QueryContext;
9+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
10+
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
11+
import com.linkedin.datahub.graphql.generated.IngestionSourceExecutionRequests;
12+
import com.linkedin.datahub.graphql.generated.ListExecutionRequestsInput;
13+
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionResolverUtils;
14+
import com.linkedin.entity.EntityResponse;
15+
import com.linkedin.entity.client.EntityClient;
16+
import com.linkedin.metadata.Constants;
17+
import com.linkedin.metadata.query.filter.SortCriterion;
18+
import com.linkedin.metadata.query.filter.SortOrder;
19+
import com.linkedin.metadata.search.SearchEntity;
20+
import com.linkedin.metadata.search.SearchResult;
21+
import graphql.schema.DataFetcher;
22+
import graphql.schema.DataFetchingEnvironment;
23+
import java.util.Collections;
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.concurrent.CompletableFuture;
29+
import lombok.RequiredArgsConstructor;
30+
import lombok.extern.slf4j.Slf4j;
31+
32+
@Slf4j
33+
@RequiredArgsConstructor
34+
public class ListExecutionRequestsResolver
35+
implements DataFetcher<CompletableFuture<IngestionSourceExecutionRequests>> {
36+
37+
private static final String DEFAULT_QUERY = "*";
38+
private static final Integer DEFAULT_START = 0;
39+
private static final Integer DEFAULT_COUNT = 20;
40+
41+
private final EntityClient _entityClient;
42+
43+
@Override
44+
public CompletableFuture<IngestionSourceExecutionRequests> get(
45+
final DataFetchingEnvironment environment) throws Exception {
46+
47+
final QueryContext context = environment.getContext();
48+
49+
final ListExecutionRequestsInput input =
50+
bindArgument(environment.getArgument("input"), ListExecutionRequestsInput.class);
51+
final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart();
52+
final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();
53+
final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery();
54+
final List<FacetFilterInput> filters =
55+
input.getFilters() == null ? Collections.emptyList() : input.getFilters();
56+
57+
// construct sort criteria, defaulting to systemCreated
58+
final SortCriterion sortCriterion;
59+
60+
// if query is expecting to sort by something, use that
61+
final com.linkedin.datahub.graphql.generated.SortCriterion sortCriterionInput = input.getSort();
62+
if (sortCriterionInput != null) {
63+
sortCriterion =
64+
new SortCriterion()
65+
.setField(sortCriterionInput.getField())
66+
.setOrder(SortOrder.valueOf(sortCriterionInput.getSortOrder().name()));
67+
} else {
68+
sortCriterion = new SortCriterion().setField("requestTimeMs").setOrder(SortOrder.DESCENDING);
69+
}
70+
71+
return GraphQLConcurrencyUtils.supplyAsync(
72+
() -> {
73+
try {
74+
// First, get all execution request Urns.
75+
final SearchResult gmsResult =
76+
_entityClient.search(
77+
context.getOperationContext().withSearchFlags(flags -> flags.setFulltext(true)),
78+
Constants.EXECUTION_REQUEST_ENTITY_NAME,
79+
query,
80+
buildFilter(filters, Collections.emptyList()),
81+
sortCriterion != null ? List.of(sortCriterion) : null,
82+
start,
83+
count);
84+
85+
log.info(String.format("Found %d execution requests", gmsResult.getNumEntities()));
86+
87+
final List<Urn> entitiesUrnList =
88+
gmsResult.getEntities().stream().map(SearchEntity::getEntity).toList();
89+
// Then, resolve all execution requests
90+
final Map<Urn, EntityResponse> entities =
91+
_entityClient.batchGetV2(
92+
context.getOperationContext(),
93+
Constants.EXECUTION_REQUEST_ENTITY_NAME,
94+
new HashSet<>(entitiesUrnList),
95+
ImmutableSet.of(
96+
Constants.EXECUTION_REQUEST_INPUT_ASPECT_NAME,
97+
Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME));
98+
final List<EntityResponse> entitiesOrdered =
99+
entitiesUrnList.stream().map(entities::get).filter(Objects::nonNull).toList();
100+
// Now that we have entities we can bind this to a result.
101+
final IngestionSourceExecutionRequests result = new IngestionSourceExecutionRequests();
102+
result.setStart(gmsResult.getFrom());
103+
result.setCount(gmsResult.getPageSize());
104+
result.setTotal(gmsResult.getNumEntities());
105+
result.setExecutionRequests(
106+
IngestionResolverUtils.mapExecutionRequests(context, entitiesOrdered));
107+
return result;
108+
} catch (Exception e) {
109+
throw new RuntimeException("Failed to list executions", e);
110+
}
111+
},
112+
this.getClass().getSimpleName(),
113+
"get");
114+
}
115+
}

datahub-graphql-core/src/main/resources/ingestion.graphql

+42
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ extend type Query {
2222
urn: The primary key associated with the ingestion source.
2323
"""
2424
ingestionSource(urn: String!): IngestionSource
25+
26+
"""
27+
List all execution requests
28+
"""
29+
listExecutionRequests(input: ListExecutionRequestsInput!): IngestionSourceExecutionRequests
2530

2631
"""
2732
Get an execution request
@@ -92,6 +97,11 @@ type ExecutionRequestSource {
9297
The type of the source, e.g. SCHEDULED_INGESTION_SOURCE
9398
"""
9499
type: String
100+
101+
"""
102+
The urn of the ingestion source, if applicable
103+
"""
104+
ingestionSource: String
95105
}
96106

97107
"""
@@ -122,6 +132,11 @@ type ExecutionRequestInput {
122132
Urn of the actor who created this execution request
123133
"""
124134
actorUrn: String
135+
136+
"""
137+
The specific executor to route the request to. If none is provided, a "default" executor is used.
138+
"""
139+
executorId: String
125140
}
126141

127142
"""
@@ -360,6 +375,33 @@ type IngestionRun {
360375
executionRequestUrn: String
361376
}
362377

378+
input ListExecutionRequestsInput {
379+
"""
380+
The starting offset of the result set
381+
"""
382+
start: Int
383+
384+
"""
385+
The number of results to be returned
386+
"""
387+
count: Int
388+
389+
"""
390+
An optional search query
391+
"""
392+
query: String
393+
394+
"""
395+
Optional Facet filters to apply to the result set
396+
"""
397+
filters: [FacetFilterInput!]
398+
399+
"""
400+
Optional sort order. Defaults to use systemCreated.
401+
"""
402+
sort: SortCriterion
403+
}
404+
363405
"""
364406
Requests for execution associated with an ingestion source
365407
"""

0 commit comments

Comments
 (0)