Skip to content

Commit 2b798b6

Browse files
committed
Explicitly cleanup SqlTask on worker when no longer needed
Currently SqlTask objects are removed from SqlTaskManager.tasks map (cache) after timeout (15 minutes by default). Even though the object is not huge, we observed increased memory pressure up to OOM on busy clusters. With this PR entries are dropped form SqlTaskManager as soon as they are no longer needed, when coordinator will no longer query for the information
1 parent af4e200 commit 2b798b6

File tree

8 files changed

+222
-7
lines changed

8 files changed

+222
-7
lines changed

core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java

+12
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import java.util.function.Predicate;
8383

8484
import static com.google.common.base.Preconditions.checkArgument;
85+
import static com.google.common.base.Preconditions.checkState;
8586
import static com.google.common.base.Throwables.throwIfUnchecked;
8687
import static com.google.common.collect.ImmutableList.toImmutableList;
8788
import static com.google.common.collect.ImmutableSet.toImmutableSet;
@@ -656,6 +657,17 @@ public TaskInfo failTask(TaskId taskId, Throwable failure)
656657
return tasks.getUnchecked(taskId).failed(failure);
657658
}
658659

660+
public void cleanupTask(TaskId taskId)
661+
{
662+
requireNonNull(taskId, "taskId is null");
663+
SqlTask sqlTask = tasks.getIfPresent(taskId);
664+
if (sqlTask == null) {
665+
return;
666+
}
667+
checkState(sqlTask.getTaskState() == TaskState.FINISHED, "cleanup called for task %s which is in state %s", taskId, sqlTask.getTaskState());
668+
tasks.unsafeInvalidate(taskId);
669+
}
670+
659671
@VisibleForTesting
660672
void removeOldTasks()
661673
{

core/trino-main/src/main/java/io/trino/server/TaskResource.java

+8
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ public TaskInfo failTask(
328328
return taskManager.failTask(taskId, failTaskRequest.getFailureInfo().toException());
329329
}
330330

331+
@POST
332+
@Path("{taskId}/cleanup")
333+
public void cleanupTask(@PathParam("taskId") TaskId taskId)
334+
{
335+
requireNonNull(taskId, "taskId is null");
336+
taskManager.cleanupTask(taskId);
337+
}
338+
331339
@GET
332340
@Path("{taskId}/results/{bufferId}/{token}")
333341
@Produces(TRINO_PAGES)

core/trino-main/src/main/java/io/trino/server/remotetask/ContinuousTaskStatusFetcher.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class ContinuousTaskStatusFetcher
6666
private final Supplier<SpanBuilder> spanBuilderFactory;
6767
private final RequestErrorTracker errorTracker;
6868
private final RemoteTaskStats stats;
69+
private final RemoteTaskCleaner remoteTaskCleaner;
6970

7071
@GuardedBy("this")
7172
private boolean running;
@@ -84,7 +85,8 @@ public ContinuousTaskStatusFetcher(
8485
Supplier<SpanBuilder> spanBuilderFactory,
8586
Duration maxErrorDuration,
8687
ScheduledExecutorService errorScheduledExecutor,
87-
RemoteTaskStats stats)
88+
RemoteTaskStats stats,
89+
RemoteTaskCleaner remoteTaskCleaner)
8890
{
8991
requireNonNull(initialTaskStatus, "initialTaskStatus is null");
9092

@@ -102,6 +104,7 @@ public ContinuousTaskStatusFetcher(
102104

103105
this.errorTracker = new RequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, errorScheduledExecutor, "getting task status");
104106
this.stats = requireNonNull(stats, "stats is null");
107+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
105108
}
106109

107110
public synchronized void start()
@@ -121,6 +124,8 @@ public synchronized void stop()
121124
future.cancel(true);
122125
future = null;
123126
}
127+
remoteTaskCleaner.markTaskStatusFetcherStopped(taskStatus.get().getState());
128+
dynamicFiltersFetcher.noMoreUpdateRequests();
124129
}
125130

126131
private synchronized void scheduleNextRequest()
@@ -253,7 +258,11 @@ void updateTaskStatus(TaskStatus newValue)
253258
onFail.accept(new TrinoException(REMOTE_TASK_MISMATCH, format("%s (%s)", REMOTE_TASK_MISMATCH_ERROR, HostAddress.fromUri(getTaskStatus().getSelf()))));
254259
}
255260

256-
dynamicFiltersFetcher.updateDynamicFiltersVersionAndFetchIfNecessary(newValue.getDynamicFiltersVersion());
261+
synchronized (this) {
262+
if (running) {
263+
dynamicFiltersFetcher.updateDynamicFiltersVersionAndFetchIfNecessary(newValue.getDynamicFiltersVersion());
264+
}
265+
}
257266
}
258267

259268
/**

core/trino-main/src/main/java/io/trino/server/remotetask/DynamicFiltersFetcher.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class DynamicFiltersFetcher
6161
private final RequestErrorTracker errorTracker;
6262
private final RemoteTaskStats stats;
6363
private final DynamicFilterService dynamicFilterService;
64+
private final RemoteTaskCleaner remoteTaskCleaner;
6465

6566
@GuardedBy("this")
6667
private long dynamicFiltersVersion = INITIAL_DYNAMIC_FILTERS_VERSION;
@@ -70,6 +71,7 @@ class DynamicFiltersFetcher
7071
private boolean running;
7172
@GuardedBy("this")
7273
private ListenableFuture<JsonResponse<VersionedDynamicFilterDomains>> future;
74+
private boolean noMoreUpdateRequests;
7375

7476
public DynamicFiltersFetcher(
7577
Consumer<Throwable> onFail,
@@ -83,7 +85,8 @@ public DynamicFiltersFetcher(
8385
Duration maxErrorDuration,
8486
ScheduledExecutorService errorScheduledExecutor,
8587
RemoteTaskStats stats,
86-
DynamicFilterService dynamicFilterService)
88+
DynamicFilterService dynamicFilterService,
89+
RemoteTaskCleaner remoteTaskCleaner)
8790
{
8891
this.taskId = requireNonNull(taskId, "taskId is null");
8992
this.taskUri = requireNonNull(taskUri, "taskUri is null");
@@ -99,6 +102,8 @@ public DynamicFiltersFetcher(
99102
this.errorTracker = new RequestErrorTracker(taskId, taskUri, maxErrorDuration, errorScheduledExecutor, "getting dynamic filter domains");
100103
this.stats = requireNonNull(stats, "stats is null");
101104
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
105+
106+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
102107
}
103108

104109
public synchronized void start()
@@ -113,17 +118,38 @@ public synchronized void start()
113118

114119
public synchronized void updateDynamicFiltersVersionAndFetchIfNecessary(long newDynamicFiltersVersion)
115120
{
121+
if (noMoreUpdateRequests) {
122+
throw new IllegalStateException("No more update requests expected");
123+
}
124+
116125
if (dynamicFiltersVersion >= newDynamicFiltersVersion) {
126+
stopIfAllDone();
117127
return;
118128
}
119129

120130
dynamicFiltersVersion = newDynamicFiltersVersion;
121131
fetchDynamicFiltersIfNecessary();
122132
}
123133

134+
public synchronized void noMoreUpdateRequests()
135+
{
136+
this.noMoreUpdateRequests = true;
137+
stopIfAllDone();
138+
}
139+
140+
@GuardedBy("this")
141+
private void stopIfAllDone()
142+
{
143+
if (localDynamicFiltersVersion >= dynamicFiltersVersion && noMoreUpdateRequests) {
144+
// we are up to date
145+
stop();
146+
}
147+
}
148+
124149
private synchronized void stop()
125150
{
126151
running = false;
152+
remoteTaskCleaner.markDynamicFilterFetcherStopped();
127153
}
128154

129155
@VisibleForTesting
@@ -141,6 +167,7 @@ private synchronized void fetchDynamicFiltersIfNecessary()
141167

142168
// local dynamic filters are up to date
143169
if (localDynamicFiltersVersion >= dynamicFiltersVersion) {
170+
stopIfAllDone();
144171
return;
145172
}
146173

core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,13 @@ public HttpRemoteTask(
322322

323323
TaskInfo initialTask = createInitialTask(taskId, location, nodeId, this.speculative.get(), pipelinedBufferStates, new TaskStats(DateTime.now(), null));
324324

325+
RemoteTaskCleaner remoteTaskCleaner = new RemoteTaskCleaner(
326+
taskId,
327+
location,
328+
httpClient,
329+
errorScheduledExecutor,
330+
() -> createSpanBuilder("remote-task-cleaner", span));
331+
325332
this.dynamicFiltersFetcher = new DynamicFiltersFetcher(
326333
this::fatalUnacknowledgedFailure,
327334
taskId,
@@ -334,7 +341,8 @@ public HttpRemoteTask(
334341
maxErrorDuration,
335342
errorScheduledExecutor,
336343
stats,
337-
dynamicFilterService);
344+
dynamicFilterService,
345+
remoteTaskCleaner);
338346

339347
this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
340348
this::fatalUnacknowledgedFailure,
@@ -347,12 +355,14 @@ public HttpRemoteTask(
347355
() -> createSpanBuilder("task-status", span),
348356
maxErrorDuration,
349357
errorScheduledExecutor,
350-
stats);
358+
stats,
359+
remoteTaskCleaner);
351360

352361
RetryPolicy retryPolicy = getRetryPolicy(session);
353362
this.taskInfoFetcher = new TaskInfoFetcher(
354363
this::fatalUnacknowledgedFailure,
355364
taskStatusFetcher,
365+
remoteTaskCleaner,
356366
initialTask,
357367
httpClient,
358368
() -> createSpanBuilder("task-info", span),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.server.remotetask;
15+
16+
import com.google.errorprone.annotations.concurrent.GuardedBy;
17+
import io.airlift.http.client.HttpClient;
18+
import io.airlift.http.client.Request;
19+
import io.airlift.http.client.StatusResponseHandler.StatusResponse;
20+
import io.airlift.log.Logger;
21+
import io.opentelemetry.api.trace.SpanBuilder;
22+
import io.trino.execution.TaskId;
23+
import io.trino.execution.TaskState;
24+
25+
import java.net.URI;
26+
import java.util.concurrent.Executor;
27+
import java.util.function.Supplier;
28+
29+
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
30+
import static io.airlift.http.client.Request.Builder.preparePost;
31+
import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
32+
import static java.util.Objects.requireNonNull;
33+
34+
public class RemoteTaskCleaner
35+
{
36+
private static final Logger log = Logger.get(RemoteTaskCleaner.class);
37+
38+
private final TaskId taskId;
39+
private final URI taskUri;
40+
private final HttpClient httpClient;
41+
private final Executor executor;
42+
private final Supplier<SpanBuilder> spanBuilderFactory;
43+
44+
@GuardedBy("this")
45+
private boolean taskStatusFetcherStopped;
46+
47+
@GuardedBy("this")
48+
private boolean taskInfoFetcherStopped;
49+
50+
@GuardedBy("this")
51+
private boolean dynamicFilterFetcherStopped;
52+
53+
@GuardedBy("this")
54+
private TaskState taskState;
55+
56+
public RemoteTaskCleaner(TaskId taskId, URI taskUri, HttpClient httpClient, Executor executor, Supplier<SpanBuilder> spanBuilderFactory)
57+
{
58+
this.taskId = requireNonNull(taskId, "taskId is null");
59+
this.taskUri = requireNonNull(taskUri, "taskUri is null");
60+
this.httpClient = requireNonNull(httpClient, "httpClient is null");
61+
this.executor = requireNonNull(executor, "executor is null");
62+
this.spanBuilderFactory = requireNonNull(spanBuilderFactory, "spanBuilderFactory is null");
63+
}
64+
65+
public synchronized void markTaskStatusFetcherStopped(TaskState taskState)
66+
{
67+
if (taskStatusFetcherStopped) {
68+
return;
69+
}
70+
taskStatusFetcherStopped = true;
71+
this.taskState = taskState;
72+
cleanupIfReady();
73+
}
74+
75+
public synchronized void markTaskInfoFetcherStopped()
76+
{
77+
if (taskInfoFetcherStopped) {
78+
return;
79+
}
80+
taskInfoFetcherStopped = true;
81+
cleanupIfReady();
82+
}
83+
84+
public synchronized void markDynamicFilterFetcherStopped()
85+
{
86+
if (dynamicFilterFetcherStopped) {
87+
return;
88+
}
89+
dynamicFilterFetcherStopped = true;
90+
cleanupIfReady();
91+
}
92+
93+
@GuardedBy("this")
94+
private void cleanupIfReady()
95+
{
96+
if (taskState != TaskState.FINISHED) {
97+
// we do not perform early cleanup if task did not finish successfully.
98+
// other workers may still reach out for the results; and we have no control over that.
99+
return;
100+
}
101+
if (taskStatusFetcherStopped && taskInfoFetcherStopped && dynamicFilterFetcherStopped) {
102+
scheduleCleanupRequest();
103+
}
104+
}
105+
106+
private void scheduleCleanupRequest()
107+
{
108+
executor.execute(
109+
() -> {
110+
Request request = preparePost()
111+
.setUri(uriBuilderFrom(taskUri)
112+
.appendPath("/cleanup")
113+
.build())
114+
.setSpanBuilder(spanBuilderFactory.get())
115+
.build();
116+
117+
StatusResponse response = httpClient.execute(request, createStatusResponseHandler());
118+
if (response.getStatusCode() != 200) {
119+
log.warn("Failed to cleanup task %s: %s", taskId, response.getStatusCode());
120+
}
121+
});
122+
}
123+
}

core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class TaskInfoFetcher
6767
private final TaskId taskId;
6868
private final Consumer<Throwable> onFail;
6969
private final ContinuousTaskStatusFetcher taskStatusFetcher;
70+
private final RemoteTaskCleaner remoteTaskCleaner;
7071
private final StateMachine<TaskInfo> taskInfo;
7172
private final StateMachine<Optional<TaskInfo>> finalTaskInfo;
7273
private final JsonCodec<TaskInfo> taskInfoCodec;
@@ -100,6 +101,7 @@ public class TaskInfoFetcher
100101
public TaskInfoFetcher(
101102
Consumer<Throwable> onFail,
102103
ContinuousTaskStatusFetcher taskStatusFetcher,
104+
RemoteTaskCleaner remoteTaskCleaner,
103105
TaskInfo initialTask,
104106
HttpClient httpClient,
105107
Supplier<SpanBuilder> spanBuilderFactory,
@@ -120,6 +122,7 @@ public TaskInfoFetcher(
120122
this.taskId = initialTask.taskStatus().getTaskId();
121123
this.onFail = requireNonNull(onFail, "onFail is null");
122124
this.taskStatusFetcher = requireNonNull(taskStatusFetcher, "taskStatusFetcher is null");
125+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
123126
this.taskInfo = new StateMachine<>("task " + taskId, executor, initialTask);
124127
this.finalTaskInfo = new StateMachine<>("task-" + taskId, executor, Optional.empty());
125128
this.taskInfoCodec = requireNonNull(taskInfoCodec, "taskInfoCodec is null");
@@ -163,6 +166,7 @@ private synchronized void stop()
163166
if (scheduledFuture != null) {
164167
scheduledFuture.cancel(true);
165168
}
169+
remoteTaskCleaner.markTaskInfoFetcherStopped();
166170
}
167171

168172
/**

0 commit comments

Comments
 (0)