Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@
import com.linkedin.datahub.graphql.resolvers.ingest.execution.IngestionSourceExecutionRequestsResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.ListExecutionRequestsResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.RollbackIngestionResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.executor.CreateExecutorPoolResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.executor.DeleteExecutorPoolsResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.executor.ListExecutorPoolsResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.secret.CreateSecretResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.secret.DeleteSecretResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.secret.GetSecretValuesResolver;
Expand Down Expand Up @@ -1126,6 +1129,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher(
"listExecutionRequests", new ListExecutionRequestsResolver(this.entityClient))
.dataFetcher("executionRequest", getResolver(executionRequestType))
.dataFetcher("listExecutorPools", new ListExecutorPoolsResolver())
.dataFetcher("getSchemaBlame", new GetSchemaBlameResolver(this.timelineService))
.dataFetcher("getTimeline", new GetTimelineResolver(this.timelineService))
.dataFetcher(
Expand Down Expand Up @@ -1314,6 +1318,8 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("deleteSecret", new DeleteSecretResolver(this.entityClient))
.dataFetcher(
"updateSecret", new UpdateSecretResolver(this.entityClient, this.secretService))
.dataFetcher("createExecutorPool", new CreateExecutorPoolResolver())
.dataFetcher("deleteExecutorPools", new DeleteExecutorPoolsResolver())
.dataFetcher(
"createAccessToken",
new CreateAccessTokenResolver(this.statefulTokenService, this.entityClient))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.linkedin.datahub.graphql.resolvers.ingest.executor;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.CreateExecutorPoolInput;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;

/**
* Creates a remote executor pool. Requires the MANAGE ingestion privilege.
*
* <p>In OSS this uses an in-memory store ({@link ExecutorPoolStore}). Managed DataHub may
* override or replace this resolver to perform actual creation against an external executor service.
*/
public class CreateExecutorPoolResolver implements DataFetcher<CompletableFuture<String>> {

@Override
public CompletableFuture<String> get(final DataFetchingEnvironment environment) throws Exception {

final QueryContext context = environment.getContext();

if (IngestionAuthUtils.canManageIngestion(context)) {
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
final CreateExecutorPoolInput input =
bindArgument(environment.getArgument("input"), CreateExecutorPoolInput.class);
if (input == null) {
return "";
}
String id = input.getId() != null ? input.getId() : "";
String name = input.getName();
return ExecutorPoolStore.create(id, name);
},
this.getClass().getSimpleName(),
"get");
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.linkedin.datahub.graphql.resolvers.ingest.executor;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Deletes one or more remote executor pools by id. Requires the MANAGE ingestion privilege.
*
* <p>In OSS this removes pools from the in-memory store ({@link ExecutorPoolStore}) and returns the
* deleted ids. Managed DataHub may override or replace this resolver to perform actual deletion
* against an external executor service.
*/
public class DeleteExecutorPoolsResolver implements DataFetcher<CompletableFuture<List<String>>> {

@Override
public CompletableFuture<List<String>> get(final DataFetchingEnvironment environment)
throws Exception {

final QueryContext context = environment.getContext();

if (IngestionAuthUtils.canManageIngestion(context)) {
@SuppressWarnings("unchecked")
final List<String> poolIds = environment.getArgument("poolIds");
return GraphQLConcurrencyUtils.supplyAsync(
() -> ExecutorPoolStore.delete(poolIds != null ? poolIds : List.of()),
this.getClass().getSimpleName(),
"get");
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.linkedin.datahub.graphql.resolvers.ingest.executor;

import com.linkedin.datahub.graphql.generated.ExecutorPool;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* In-memory store for executor pools. Used by OSS resolvers so create/list/delete work within a
* single JVM. Does not persist across restarts. Managed DataHub may use external storage instead.
*/
final class ExecutorPoolStore {

private static final Map<String, ExecutorPool> POOLS = new ConcurrentHashMap<>();

static List<ExecutorPool> list(int start, int count) {
List<ExecutorPool> all = new ArrayList<>(POOLS.values());
int total = all.size();
int from = Math.min(start, total);
int to = Math.min(start + count, total);
return new ArrayList<>(all.subList(from, to));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unpredictable pool ordering breaks pagination consistency

Low Severity

The ExecutorPoolStore uses ConcurrentHashMap which does not preserve insertion order or provide any deterministic iteration order. Pools created as A, B, C may be returned in any order (e.g., C, A, B), and this order may change between requests. This makes server-side pagination unreliable—a pool on page 1 in one request might appear on page 2 in the next.

Fix in Cursor Fix in Web

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative pagination values cause IndexOutOfBoundsException crash

Medium Severity

The list() method passes user-provided start and count values directly to ArrayList.subList() without validating they're non-negative. A GraphQL query with start: -1 would result in subList(-1, ...) which throws IndexOutOfBoundsException. Similarly, a negative count could make to less than from, throwing IllegalArgumentException. This would cause the API to return a 500 error instead of handling invalid input gracefully.

Additional Locations (1)

Fix in Cursor Fix in Web


static int totalCount() {
return POOLS.size();
}

static String create(String id, String name) {
if (id == null || id.isBlank()) {
return "";
}
ExecutorPool pool = new ExecutorPool();
pool.setId(id.trim());
pool.setName(name != null && !name.isBlank() ? name.trim() : null);
POOLS.put(id.trim(), pool);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating pool with existing ID silently overwrites it

Medium Severity

The create method uses POOLS.put() which silently overwrites any existing pool with the same ID. When a user calls the createExecutorPool mutation with an ID that already exists, the existing pool's name is replaced without any error or warning. Users expect a "create" operation to fail if the resource already exists, not perform an upsert. This could lead to accidental data loss if a user unknowingly provides an ID that's already in use.

Fix in Cursor Fix in Web

return id.trim();
}

static List<String> delete(List<String> poolIds) {
if (poolIds == null) {
return List.of();
}
return poolIds.stream()
.filter(Objects::nonNull)
.filter(id -> POOLS.remove(id) != null)
.collect(Collectors.toList());
}

private ExecutorPoolStore() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.linkedin.datahub.graphql.resolvers.ingest.executor;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.ExecutorPool;
import com.linkedin.datahub.graphql.generated.ListExecutorPoolsInput;
import com.linkedin.datahub.graphql.generated.ListExecutorPoolsResult;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Lists remote executor pools. Requires the MANAGE ingestion privilege.
*
* <p>In OSS this returns pools from an in-memory store (see ExecutorPoolStore). Managed DataHub
* may override or replace this resolver to return pools from an external executor service.
*/
public class ListExecutorPoolsResolver implements DataFetcher<CompletableFuture<ListExecutorPoolsResult>> {

private static final Integer DEFAULT_START = 0;
private static final Integer DEFAULT_COUNT = 20;

@Override
public CompletableFuture<ListExecutorPoolsResult> get(final DataFetchingEnvironment environment)
throws Exception {

final QueryContext context = environment.getContext();

if (IngestionAuthUtils.canManageIngestion(context)) {
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
final ListExecutorPoolsInput input =
bindArgument(environment.getArgument("input"), ListExecutorPoolsInput.class);
if (input == null) {
final ListExecutorPoolsResult empty = new ListExecutorPoolsResult();
empty.setStart(DEFAULT_START);
empty.setCount(DEFAULT_COUNT);
empty.setTotal(0);
empty.setPools(List.of());
return empty;
}
final int start = input.getStart() == null ? DEFAULT_START : input.getStart();
final int count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();

int total = ExecutorPoolStore.totalCount();
List<ExecutorPool> pools = ExecutorPoolStore.list(start, count);

final ListExecutorPoolsResult result = new ListExecutorPoolsResult();
result.setStart(start);
result.setCount(count);
result.setTotal(total);
result.setPools(pools);
return result;
},
this.getClass().getSimpleName(),
"get");
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
}
}
93 changes: 93 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ extend type Query {
urn: The primary key associated with the execution request.
"""
executionRequest(urn: String!): ExecutionRequest

"""
List all remote executor pools. Requires the MANAGE ingestion privilege.
In OSS this returns an empty list; managed DataHub may implement full support.
"""
listExecutorPools(input: ListExecutorPoolsInput!): ListExecutorPoolsResult
}

extend type Mutation {
Expand Down Expand Up @@ -98,6 +104,18 @@ extend type Mutation {
Rollback a specific ingestion execution run based on its runId
"""
rollbackIngestion(input: RollbackIngestionInput!): String

"""
Create a remote executor pool. Requires the MANAGE ingestion privilege.
In OSS this is a stub; managed DataHub may implement full support.
"""
createExecutorPool(input: CreateExecutorPoolInput!): String

"""
Delete one or more remote executor pools by id. Requires the MANAGE ingestion privilege.
In OSS this is a no-op; managed DataHub may implement full support.
"""
deleteExecutorPools(poolIds: [String!]!): [String!]
}

"""
Expand Down Expand Up @@ -348,6 +366,81 @@ type ListSecretsResult {
secrets: [Secret!]!
}

"""
A remote executor pool. In OSS this is a stub; managed DataHub may provide full pool metadata.
"""
type ExecutorPool {
"""
Unique id of the executor pool
"""
id: String!

"""
Optional display name for the pool
"""
name: String
}

"""
Input for creating a remote executor pool
"""
input CreateExecutorPoolInput {
"""
Unique id for the pool
"""
id: String!

"""
Optional display name
"""
name: String
}

"""
Input for listing remote executor pools
"""
input ListExecutorPoolsInput {
"""
The starting offset of the result set
"""
start: Int

"""
The number of results to be returned
"""
count: Int

"""
An optional search query
"""
query: String
}

"""
Result of listing remote executor pools
"""
type ListExecutorPoolsResult {
"""
The starting offset of the result set
"""
start: Int!

"""
The number of results returned
"""
count: Int!

"""
The total number of pools
"""
total: Int!

"""
The executor pools
"""
pools: [ExecutorPool!]!
}

"""
A schedule associated with an Ingestion Source
"""
Expand Down
7 changes: 7 additions & 0 deletions datahub-web-react/src/app/ingestV2/ManageIngestionPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Tab } from '@components/components/Tabs/Tabs';
import analytics, { EventType } from '@app/analytics';
import { useUserContext } from '@app/context/useUserContext';
import { ExecutionsTab } from '@app/ingestV2/executions/ExecutionsTab';
import { ExecutorPoolsTab } from '@app/ingestV2/executor/ExecutorPoolsTab';
import { useIngestionOnboardingRedesignV1 } from '@app/ingestV2/hooks/useIngestionOnboardingRedesignV1';
import { SecretsList } from '@app/ingestV2/secret/SecretsList';
import { IngestionSourceList } from '@app/ingestV2/source/IngestionSourceList';
Expand Down Expand Up @@ -79,6 +80,7 @@ export const ManageIngestionPage = () => {
const canManageIngestion = platformPrivileges?.manageIngestion;
const showIngestionTab = isIngestionEnabled && canManageIngestion;
const showSecretsTab = isIngestionEnabled && platformPrivileges?.manageSecrets;
const showExecutorsTab = isIngestionEnabled && canManageIngestion;
const showIngestionOnboardingRedesignV1 = useIngestionOnboardingRedesignV1();

// undefined == not loaded, null == no permissions
Expand Down Expand Up @@ -177,6 +179,11 @@ export const ManageIngestionPage = () => {
key: TabType.RunHistory as string,
name: 'Run History',
},
showExecutorsTab && {
component: <ExecutorPoolsTab />,
key: TabType.RemoteExecutors as string,
name: TabType.RemoteExecutors as string,
},
showSecretsTab && {
component: (
<SecretsList showCreateModal={showCreateSecretModal} setShowCreateModal={setShowCreateSecretModal} />
Expand Down
Loading
Loading