Skip to content

Commit 5045fc2

Browse files
committed
Explicitly cleanup SqlTask on worker when no longer needed
Currently SqlTask objects are removed from SqlTaskManager.tasks map (cache) after timeout (15 minutes by default). Even though the object is not huge, we observed increased memory pressure up to OOM on busy clusters. With this PR entries are dropped form SqlTaskManager as soon as they are no longer needed, when coordinator will no longer query for the information
1 parent af4e200 commit 5045fc2

File tree

7 files changed

+164
-4
lines changed

7 files changed

+164
-4
lines changed

core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java

+12
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import java.util.function.Predicate;
8383

8484
import static com.google.common.base.Preconditions.checkArgument;
85+
import static com.google.common.base.Preconditions.checkState;
8586
import static com.google.common.base.Throwables.throwIfUnchecked;
8687
import static com.google.common.collect.ImmutableList.toImmutableList;
8788
import static com.google.common.collect.ImmutableSet.toImmutableSet;
@@ -656,6 +657,17 @@ public TaskInfo failTask(TaskId taskId, Throwable failure)
656657
return tasks.getUnchecked(taskId).failed(failure);
657658
}
658659

660+
public void cleanupTask(TaskId taskId)
661+
{
662+
requireNonNull(taskId, "taskId is null");
663+
SqlTask sqlTask = tasks.getIfPresent(taskId);
664+
if (sqlTask == null) {
665+
return;
666+
}
667+
checkState(sqlTask.getTaskState() == TaskState.FINISHED, "cleanup called for task %s which is in state %s", taskId, sqlTask.getTaskState());
668+
tasks.unsafeInvalidate(taskId);
669+
}
670+
659671
@VisibleForTesting
660672
void removeOldTasks()
661673
{

core/trino-main/src/main/java/io/trino/server/TaskResource.java

+9
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,15 @@ public TaskInfo failTask(
328328
return taskManager.failTask(taskId, failTaskRequest.getFailureInfo().toException());
329329
}
330330

331+
@POST
332+
@Path("{taskId}/cleanup")
333+
public void cleanupTask(
334+
@PathParam("taskId") TaskId taskId)
335+
{
336+
requireNonNull(taskId, "taskId is null");
337+
taskManager.cleanupTask(taskId);
338+
}
339+
331340
@GET
332341
@Path("{taskId}/results/{bufferId}/{token}")
333342
@Produces(TRINO_PAGES)

core/trino-main/src/main/java/io/trino/server/remotetask/ContinuousTaskStatusFetcher.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class ContinuousTaskStatusFetcher
6666
private final Supplier<SpanBuilder> spanBuilderFactory;
6767
private final RequestErrorTracker errorTracker;
6868
private final RemoteTaskStats stats;
69+
private final RemoteTaskCleaner remoteTaskCleaner;
6970

7071
@GuardedBy("this")
7172
private boolean running;
@@ -84,7 +85,8 @@ public ContinuousTaskStatusFetcher(
8485
Supplier<SpanBuilder> spanBuilderFactory,
8586
Duration maxErrorDuration,
8687
ScheduledExecutorService errorScheduledExecutor,
87-
RemoteTaskStats stats)
88+
RemoteTaskStats stats,
89+
RemoteTaskCleaner remoteTaskCleaner)
8890
{
8991
requireNonNull(initialTaskStatus, "initialTaskStatus is null");
9092

@@ -102,6 +104,7 @@ public ContinuousTaskStatusFetcher(
102104

103105
this.errorTracker = new RequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, errorScheduledExecutor, "getting task status");
104106
this.stats = requireNonNull(stats, "stats is null");
107+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
105108
}
106109

107110
public synchronized void start()
@@ -121,6 +124,7 @@ public synchronized void stop()
121124
future.cancel(true);
122125
future = null;
123126
}
127+
remoteTaskCleaner.markTaskStatusFetcherStopped(taskStatus.get().getState());
124128
}
125129

126130
private synchronized void scheduleNextRequest()

core/trino-main/src/main/java/io/trino/server/remotetask/DynamicFiltersFetcher.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class DynamicFiltersFetcher
6161
private final RequestErrorTracker errorTracker;
6262
private final RemoteTaskStats stats;
6363
private final DynamicFilterService dynamicFilterService;
64+
private final RemoteTaskCleaner remoteTaskCleaner;
6465

6566
@GuardedBy("this")
6667
private long dynamicFiltersVersion = INITIAL_DYNAMIC_FILTERS_VERSION;
@@ -83,7 +84,8 @@ public DynamicFiltersFetcher(
8384
Duration maxErrorDuration,
8485
ScheduledExecutorService errorScheduledExecutor,
8586
RemoteTaskStats stats,
86-
DynamicFilterService dynamicFilterService)
87+
DynamicFilterService dynamicFilterService,
88+
RemoteTaskCleaner remoteTaskCleaner)
8789
{
8890
this.taskId = requireNonNull(taskId, "taskId is null");
8991
this.taskUri = requireNonNull(taskUri, "taskUri is null");
@@ -99,6 +101,8 @@ public DynamicFiltersFetcher(
99101
this.errorTracker = new RequestErrorTracker(taskId, taskUri, maxErrorDuration, errorScheduledExecutor, "getting dynamic filter domains");
100102
this.stats = requireNonNull(stats, "stats is null");
101103
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
104+
105+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
102106
}
103107

104108
public synchronized void start()
@@ -124,6 +128,7 @@ public synchronized void updateDynamicFiltersVersionAndFetchIfNecessary(long new
124128
private synchronized void stop()
125129
{
126130
running = false;
131+
remoteTaskCleaner.markDynamidFilterFetcherStopped();
127132
}
128133

129134
@VisibleForTesting

core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,13 @@ public HttpRemoteTask(
322322

323323
TaskInfo initialTask = createInitialTask(taskId, location, nodeId, this.speculative.get(), pipelinedBufferStates, new TaskStats(DateTime.now(), null));
324324

325+
RemoteTaskCleaner remoteTaskCleaner = new RemoteTaskCleaner(
326+
taskId,
327+
location,
328+
httpClient,
329+
errorScheduledExecutor,
330+
() -> createSpanBuilder("remote-task-cleaner", span));
331+
325332
this.dynamicFiltersFetcher = new DynamicFiltersFetcher(
326333
this::fatalUnacknowledgedFailure,
327334
taskId,
@@ -334,7 +341,8 @@ public HttpRemoteTask(
334341
maxErrorDuration,
335342
errorScheduledExecutor,
336343
stats,
337-
dynamicFilterService);
344+
dynamicFilterService,
345+
remoteTaskCleaner);
338346

339347
this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
340348
this::fatalUnacknowledgedFailure,
@@ -347,12 +355,14 @@ public HttpRemoteTask(
347355
() -> createSpanBuilder("task-status", span),
348356
maxErrorDuration,
349357
errorScheduledExecutor,
350-
stats);
358+
stats,
359+
remoteTaskCleaner);
351360

352361
RetryPolicy retryPolicy = getRetryPolicy(session);
353362
this.taskInfoFetcher = new TaskInfoFetcher(
354363
this::fatalUnacknowledgedFailure,
355364
taskStatusFetcher,
365+
remoteTaskCleaner,
356366
initialTask,
357367
httpClient,
358368
() -> createSpanBuilder("task-info", span),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package io.trino.server.remotetask;
2+
3+
import com.google.errorprone.annotations.concurrent.GuardedBy;
4+
import io.airlift.http.client.HttpClient;
5+
import io.airlift.http.client.Request;
6+
import io.airlift.http.client.StatusResponseHandler;
7+
import io.airlift.log.Logger;
8+
import io.opentelemetry.api.trace.SpanBuilder;
9+
import io.trino.execution.TaskId;
10+
import io.trino.execution.TaskState;
11+
12+
import java.net.URI;
13+
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.ThreadLocalRandom;
15+
import java.util.function.Supplier;
16+
17+
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
18+
import static io.airlift.http.client.Request.Builder.preparePost;
19+
import static java.util.Objects.requireNonNull;
20+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
21+
22+
public class RemoteTaskCleaner
23+
{
24+
private static final Logger log = Logger.get(RemoteTaskCleaner.class);
25+
26+
private static final long TASK_CLEANUP_DELAY_VARIANCE_MILLIS = 10_000;
27+
28+
private final TaskId taskId;
29+
private final URI taskUri;
30+
private final HttpClient httpClient;
31+
private final ScheduledExecutorService executor;
32+
private final Supplier<SpanBuilder> spanBuilderFactory;
33+
34+
@GuardedBy("this")
35+
private boolean taskStatusFetcherStopped;
36+
37+
@GuardedBy("this")
38+
private boolean taskInfoFetcherStopped;
39+
40+
@GuardedBy("this")
41+
private boolean dynamidFilterFetcherStopped;
42+
43+
@GuardedBy("this")
44+
private TaskState taskState;
45+
46+
public RemoteTaskCleaner(TaskId taskId, URI taskUri, HttpClient httpClient, ScheduledExecutorService executor, Supplier<SpanBuilder> spanBuilderFactory)
47+
{
48+
this.taskId = requireNonNull(taskId, "taskId is null");
49+
this.taskUri = requireNonNull(taskUri, "taskUri is null");
50+
this.httpClient = requireNonNull(httpClient, "httpClient is null");
51+
this.executor = requireNonNull(executor, "executor is null");
52+
this.spanBuilderFactory = requireNonNull(spanBuilderFactory, "spanBuilderFactory is null");
53+
}
54+
55+
public synchronized void markTaskStatusFetcherStopped(TaskState taskState)
56+
{
57+
if (taskStatusFetcherStopped) {
58+
return;
59+
}
60+
taskStatusFetcherStopped = true;
61+
this.taskState = taskState;
62+
cleanupIfReady();
63+
}
64+
65+
public synchronized void markTaskInfoFetcherStopped()
66+
{
67+
if (taskInfoFetcherStopped) {
68+
return;
69+
}
70+
taskInfoFetcherStopped = true;
71+
cleanupIfReady();
72+
}
73+
74+
public synchronized void markDynamidFilterFetcherStopped()
75+
{
76+
if (dynamidFilterFetcherStopped) {
77+
return;
78+
}
79+
dynamidFilterFetcherStopped = true;
80+
cleanupIfReady();
81+
}
82+
83+
@GuardedBy("this")
84+
private void cleanupIfReady()
85+
{
86+
if (taskState != TaskState.FINISHED) {
87+
// we do not perform early cleanup if task did not finish successfully.
88+
// other workers may still reach out for the results; and we have no control over that.
89+
return;
90+
}
91+
if (taskStatusFetcherStopped && taskInfoFetcherStopped && dynamidFilterFetcherStopped) {
92+
scheduleCleanupRequest();
93+
}
94+
}
95+
96+
private void scheduleCleanupRequest()
97+
{
98+
executor.schedule(
99+
() -> {
100+
Request request = preparePost()
101+
.setUri(uriBuilderFrom(taskUri)
102+
.appendPath("/cleanup")
103+
.build())
104+
.setSpanBuilder(spanBuilderFactory.get())
105+
.build();
106+
107+
StatusResponseHandler.StatusResponse response = httpClient.execute(request, StatusResponseHandler.createStatusResponseHandler());
108+
if (response.getStatusCode() != 200) {
109+
log.warn("Failed to cleanup task %s: %s", taskId, response.getStatusCode());
110+
}
111+
return null;
112+
},
113+
ThreadLocalRandom.current().nextLong(TASK_CLEANUP_DELAY_VARIANCE_MILLIS),
114+
MILLISECONDS);
115+
}
116+
}

core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class TaskInfoFetcher
6767
private final TaskId taskId;
6868
private final Consumer<Throwable> onFail;
6969
private final ContinuousTaskStatusFetcher taskStatusFetcher;
70+
private final RemoteTaskCleaner remoteTaskCleaner;
7071
private final StateMachine<TaskInfo> taskInfo;
7172
private final StateMachine<Optional<TaskInfo>> finalTaskInfo;
7273
private final JsonCodec<TaskInfo> taskInfoCodec;
@@ -100,6 +101,7 @@ public class TaskInfoFetcher
100101
public TaskInfoFetcher(
101102
Consumer<Throwable> onFail,
102103
ContinuousTaskStatusFetcher taskStatusFetcher,
104+
RemoteTaskCleaner remoteTaskCleaner,
103105
TaskInfo initialTask,
104106
HttpClient httpClient,
105107
Supplier<SpanBuilder> spanBuilderFactory,
@@ -120,6 +122,7 @@ public TaskInfoFetcher(
120122
this.taskId = initialTask.taskStatus().getTaskId();
121123
this.onFail = requireNonNull(onFail, "onFail is null");
122124
this.taskStatusFetcher = requireNonNull(taskStatusFetcher, "taskStatusFetcher is null");
125+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
123126
this.taskInfo = new StateMachine<>("task " + taskId, executor, initialTask);
124127
this.finalTaskInfo = new StateMachine<>("task-" + taskId, executor, Optional.empty());
125128
this.taskInfoCodec = requireNonNull(taskInfoCodec, "taskInfoCodec is null");
@@ -163,6 +166,7 @@ private synchronized void stop()
163166
if (scheduledFuture != null) {
164167
scheduledFuture.cancel(true);
165168
}
169+
remoteTaskCleaner.markTaskInfoFetcherStopped();
166170
}
167171

168172
/**

0 commit comments

Comments
 (0)