Skip to content

Commit 84464b0

Browse files
committed
Explicitly cleanup SqlTask on worker when no longer needed
Currently SqlTask objects are removed from SqlTaskManager.tasks map (cache) after timeout (15 minutes by default). Even though the object is not huge, we observed increased memory pressure up to OOM on busy clusters. With this PR entries are dropped form SqlTaskManager as soon as they are no longer needed, when coordinator will no longer query for the information
1 parent 9ae7942 commit 84464b0

File tree

8 files changed

+218
-9
lines changed

8 files changed

+218
-9
lines changed

core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java

+25
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.opentelemetry.api.trace.Span;
3131
import io.opentelemetry.api.trace.Tracer;
3232
import io.trino.Session;
33+
import io.trino.cache.NonEvictableCache;
3334
import io.trino.cache.NonEvictableLoadingCache;
3435
import io.trino.connector.ConnectorServicesProvider;
3536
import io.trino.event.SplitMonitor;
@@ -82,6 +83,7 @@
8283
import java.util.function.Predicate;
8384

8485
import static com.google.common.base.Preconditions.checkArgument;
86+
import static com.google.common.base.Preconditions.checkState;
8587
import static com.google.common.base.Throwables.throwIfUnchecked;
8688
import static com.google.common.collect.ImmutableList.toImmutableList;
8789
import static com.google.common.collect.ImmutableSet.toImmutableSet;
@@ -109,6 +111,8 @@
109111
public class SqlTaskManager
110112
implements Closeable
111113
{
114+
private static final Object TOMBSTONE = new Object();
115+
private static final java.time.Duration TOMBSTONE_CLEANUP_INTERVAL = java.time.Duration.ofMinutes(5);
112116
private static final Logger log = Logger.get(SqlTaskManager.class);
113117
private static final Set<String> JONI_REGEXP_FUNCTION_CLASS_NAMES = ImmutableSet.of(
114118
JoniRegexpFunctions.class.getName(),
@@ -130,6 +134,7 @@ public class SqlTaskManager
130134

131135
private final NonEvictableLoadingCache<QueryId, QueryContext> queryContexts;
132136
private final NonEvictableLoadingCache<TaskId, SqlTask> tasks;
137+
private final NonEvictableCache<TaskId, Object> taskTombstones;
133138

134139
private final SqlTaskIoStats cachedStats = new SqlTaskIoStats();
135140
private final SqlTaskIoStats finishedTaskStats = new SqlTaskIoStats();
@@ -230,8 +235,14 @@ public SqlTaskManager(
230235
queryContexts = buildNonEvictableCache(CacheBuilder.newBuilder().weakValues(), CacheLoader.from(
231236
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, maxQuerySpillPerNode)));
232237

238+
taskTombstones = buildNonEvictableCache(CacheBuilder.newBuilder()
239+
.expireAfterWrite(TOMBSTONE_CLEANUP_INTERVAL));
240+
233241
tasks = buildNonEvictableCache(CacheBuilder.newBuilder(), CacheLoader.from(
234242
taskId -> {
243+
if (taskTombstones.asMap().containsKey(taskId)) {
244+
throw new TrinoException(GENERIC_USER_ERROR, "Task is already destroyed");
245+
}
235246
createdTasks.update(1);
236247
return createSqlTask(
237248
taskId,
@@ -656,6 +667,20 @@ public TaskInfo failTask(TaskId taskId, Throwable failure)
656667
return tasks.getUnchecked(taskId).failed(failure);
657668
}
658669

670+
public void cleanupTask(TaskId taskId)
671+
{
672+
requireNonNull(taskId, "taskId is null");
673+
SqlTask sqlTask = tasks.getIfPresent(taskId);
674+
if (sqlTask == null) {
675+
return;
676+
}
677+
checkState(sqlTask.getTaskState() == TaskState.FINISHED, "cleanup called for task %s which is in state %s", taskId, sqlTask.getTaskState());
678+
taskTombstones.put(taskId, TOMBSTONE); // prevent task reincarnation in tasks cache in case of race.
679+
// Races are possible as e.g. do to aborted network requests from workers/coordinators
680+
// which got delivered after task was already completed.
681+
tasks.unsafeInvalidate(taskId);
682+
}
683+
659684
@VisibleForTesting
660685
void removeOldTasks()
661686
{

core/trino-main/src/main/java/io/trino/server/TaskResource.java

+8
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ public TaskInfo failTask(
328328
return taskManager.failTask(taskId, failTaskRequest.getFailureInfo().toException());
329329
}
330330

331+
@POST
332+
@Path("{taskId}/cleanup")
333+
public void cleanupTask(@PathParam("taskId") TaskId taskId)
334+
{
335+
requireNonNull(taskId, "taskId is null");
336+
taskManager.cleanupTask(taskId);
337+
}
338+
331339
@GET
332340
@Path("{taskId}/results/{bufferId}/{token}")
333341
@Produces(TRINO_PAGES)

core/trino-main/src/main/java/io/trino/server/remotetask/ContinuousTaskStatusFetcher.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class ContinuousTaskStatusFetcher
6666
private final Supplier<SpanBuilder> spanBuilderFactory;
6767
private final RequestErrorTracker errorTracker;
6868
private final RemoteTaskStats stats;
69+
private final RemoteTaskCleaner remoteTaskCleaner;
6970

7071
@GuardedBy("this")
7172
private boolean running;
@@ -84,7 +85,8 @@ public ContinuousTaskStatusFetcher(
8485
Supplier<SpanBuilder> spanBuilderFactory,
8586
Duration maxErrorDuration,
8687
ScheduledExecutorService errorScheduledExecutor,
87-
RemoteTaskStats stats)
88+
RemoteTaskStats stats,
89+
RemoteTaskCleaner remoteTaskCleaner)
8890
{
8991
requireNonNull(initialTaskStatus, "initialTaskStatus is null");
9092

@@ -102,6 +104,7 @@ public ContinuousTaskStatusFetcher(
102104

103105
this.errorTracker = new RequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, errorScheduledExecutor, "getting task status");
104106
this.stats = requireNonNull(stats, "stats is null");
107+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
105108
}
106109

107110
public synchronized void start()
@@ -121,6 +124,7 @@ public synchronized void stop()
121124
future.cancel(true);
122125
future = null;
123126
}
127+
remoteTaskCleaner.markTaskStatusFetcherStopped(taskStatus.get().getState());
124128
}
125129

126130
private synchronized void scheduleNextRequest()
@@ -253,7 +257,7 @@ void updateTaskStatus(TaskStatus newValue)
253257
onFail.accept(new TrinoException(REMOTE_TASK_MISMATCH, format("%s (%s)", REMOTE_TASK_MISMATCH_ERROR, HostAddress.fromUri(getTaskStatus().getSelf()))));
254258
}
255259

256-
dynamicFiltersFetcher.updateDynamicFiltersVersionAndFetchIfNecessary(newValue.getDynamicFiltersVersion());
260+
dynamicFiltersFetcher.updateDynamicFiltersVersionAndFetchIfNecessary(newValue.getDynamicFiltersVersion(), taskStatus.get().getState().isDone());
257261
}
258262

259263
/**

core/trino-main/src/main/java/io/trino/server/remotetask/DynamicFiltersFetcher.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class DynamicFiltersFetcher
6161
private final RequestErrorTracker errorTracker;
6262
private final RemoteTaskStats stats;
6363
private final DynamicFilterService dynamicFilterService;
64+
private final RemoteTaskCleaner remoteTaskCleaner;
6465

6566
@GuardedBy("this")
6667
private long dynamicFiltersVersion = INITIAL_DYNAMIC_FILTERS_VERSION;
@@ -70,6 +71,7 @@ class DynamicFiltersFetcher
7071
private boolean running;
7172
@GuardedBy("this")
7273
private ListenableFuture<JsonResponse<VersionedDynamicFilterDomains>> future;
74+
private boolean finalRequest;
7375

7476
public DynamicFiltersFetcher(
7577
Consumer<Throwable> onFail,
@@ -83,7 +85,8 @@ public DynamicFiltersFetcher(
8385
Duration maxErrorDuration,
8486
ScheduledExecutorService errorScheduledExecutor,
8587
RemoteTaskStats stats,
86-
DynamicFilterService dynamicFilterService)
88+
DynamicFilterService dynamicFilterService,
89+
RemoteTaskCleaner remoteTaskCleaner)
8790
{
8891
this.taskId = requireNonNull(taskId, "taskId is null");
8992
this.taskUri = requireNonNull(taskUri, "taskUri is null");
@@ -99,6 +102,8 @@ public DynamicFiltersFetcher(
99102
this.errorTracker = new RequestErrorTracker(taskId, taskUri, maxErrorDuration, errorScheduledExecutor, "getting dynamic filter domains");
100103
this.stats = requireNonNull(stats, "stats is null");
101104
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
105+
106+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
102107
}
103108

104109
public synchronized void start()
@@ -111,19 +116,23 @@ public synchronized void start()
111116
fetchDynamicFiltersIfNecessary();
112117
}
113118

114-
public synchronized void updateDynamicFiltersVersionAndFetchIfNecessary(long newDynamicFiltersVersion)
119+
public synchronized void updateDynamicFiltersVersionAndFetchIfNecessary(long newDynamicFiltersVersion, boolean finalRequest)
115120
{
116121
if (dynamicFiltersVersion >= newDynamicFiltersVersion) {
122+
if (finalRequest) {
123+
remoteTaskCleaner.markDynamicFilterFetcherStopped();
124+
}
117125
return;
118126
}
119-
127+
this.finalRequest = finalRequest;
120128
dynamicFiltersVersion = newDynamicFiltersVersion;
121129
fetchDynamicFiltersIfNecessary();
122130
}
123131

124132
private synchronized void stop()
125133
{
126134
running = false;
135+
remoteTaskCleaner.markDynamicFilterFetcherStopped();
127136
}
128137

129138
@VisibleForTesting
@@ -141,6 +150,9 @@ private synchronized void fetchDynamicFiltersIfNecessary()
141150

142151
// local dynamic filters are up to date
143152
if (localDynamicFiltersVersion >= dynamicFiltersVersion) {
153+
if (finalRequest) {
154+
remoteTaskCleaner.markDynamicFilterFetcherStopped();
155+
}
144156
return;
145157
}
146158

core/trino-main/src/main/java/io/trino/server/remotetask/HttpRemoteTask.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,13 @@ public HttpRemoteTask(
322322

323323
TaskInfo initialTask = createInitialTask(taskId, location, nodeId, this.speculative.get(), pipelinedBufferStates, new TaskStats(DateTime.now(), null));
324324

325+
RemoteTaskCleaner remoteTaskCleaner = new RemoteTaskCleaner(
326+
taskId,
327+
location,
328+
httpClient,
329+
errorScheduledExecutor,
330+
() -> createSpanBuilder("remote-task-cleaner", span));
331+
325332
this.dynamicFiltersFetcher = new DynamicFiltersFetcher(
326333
this::fatalUnacknowledgedFailure,
327334
taskId,
@@ -334,7 +341,8 @@ public HttpRemoteTask(
334341
maxErrorDuration,
335342
errorScheduledExecutor,
336343
stats,
337-
dynamicFilterService);
344+
dynamicFilterService,
345+
remoteTaskCleaner);
338346

339347
this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
340348
this::fatalUnacknowledgedFailure,
@@ -347,12 +355,14 @@ public HttpRemoteTask(
347355
() -> createSpanBuilder("task-status", span),
348356
maxErrorDuration,
349357
errorScheduledExecutor,
350-
stats);
358+
stats,
359+
remoteTaskCleaner);
351360

352361
RetryPolicy retryPolicy = getRetryPolicy(session);
353362
this.taskInfoFetcher = new TaskInfoFetcher(
354363
this::fatalUnacknowledgedFailure,
355364
taskStatusFetcher,
365+
remoteTaskCleaner,
356366
initialTask,
357367
httpClient,
358368
() -> createSpanBuilder("task-info", span),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.server.remotetask;
15+
16+
import com.google.errorprone.annotations.concurrent.GuardedBy;
17+
import io.airlift.http.client.HttpClient;
18+
import io.airlift.http.client.Request;
19+
import io.airlift.http.client.StatusResponseHandler.StatusResponse;
20+
import io.airlift.log.Logger;
21+
import io.opentelemetry.api.trace.SpanBuilder;
22+
import io.trino.execution.TaskId;
23+
import io.trino.execution.TaskState;
24+
25+
import java.net.URI;
26+
import java.util.concurrent.Executor;
27+
import java.util.function.Supplier;
28+
29+
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
30+
import static io.airlift.http.client.Request.Builder.preparePost;
31+
import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
32+
import static java.util.Objects.requireNonNull;
33+
34+
public class RemoteTaskCleaner
35+
{
36+
private static final Logger log = Logger.get(RemoteTaskCleaner.class);
37+
38+
private final TaskId taskId;
39+
private final URI taskUri;
40+
private final HttpClient httpClient;
41+
private final Executor executor;
42+
private final Supplier<SpanBuilder> spanBuilderFactory;
43+
44+
@GuardedBy("this")
45+
private boolean taskStatusFetcherStopped;
46+
47+
@GuardedBy("this")
48+
private boolean taskInfoFetcherStopped;
49+
50+
@GuardedBy("this")
51+
private boolean dynamicFilterFetcherStopped;
52+
53+
@GuardedBy("this")
54+
private TaskState taskState;
55+
56+
public RemoteTaskCleaner(TaskId taskId, URI taskUri, HttpClient httpClient, Executor executor, Supplier<SpanBuilder> spanBuilderFactory)
57+
{
58+
this.taskId = requireNonNull(taskId, "taskId is null");
59+
this.taskUri = requireNonNull(taskUri, "taskUri is null");
60+
this.httpClient = requireNonNull(httpClient, "httpClient is null");
61+
this.executor = requireNonNull(executor, "executor is null");
62+
this.spanBuilderFactory = requireNonNull(spanBuilderFactory, "spanBuilderFactory is null");
63+
}
64+
65+
public synchronized void markTaskStatusFetcherStopped(TaskState taskState)
66+
{
67+
if (taskStatusFetcherStopped) {
68+
return;
69+
}
70+
taskStatusFetcherStopped = true;
71+
this.taskState = taskState;
72+
cleanupIfReady();
73+
}
74+
75+
public synchronized void markTaskInfoFetcherStopped()
76+
{
77+
if (taskInfoFetcherStopped) {
78+
return;
79+
}
80+
taskInfoFetcherStopped = true;
81+
cleanupIfReady();
82+
}
83+
84+
public synchronized void markDynamicFilterFetcherStopped()
85+
{
86+
if (dynamicFilterFetcherStopped) {
87+
return;
88+
}
89+
dynamicFilterFetcherStopped = true;
90+
cleanupIfReady();
91+
}
92+
93+
@GuardedBy("this")
94+
private void cleanupIfReady()
95+
{
96+
if (taskState != TaskState.FINISHED) {
97+
// we do not perform early cleanup if task did not finish successfully.
98+
// other workers may still reach out for the results; and we have no control over that.
99+
return;
100+
}
101+
if (taskStatusFetcherStopped && taskInfoFetcherStopped && dynamicFilterFetcherStopped) {
102+
scheduleCleanupRequest();
103+
}
104+
}
105+
106+
private void scheduleCleanupRequest()
107+
{
108+
executor.execute(
109+
() -> {
110+
Request request = preparePost()
111+
.setUri(uriBuilderFrom(taskUri)
112+
.appendPath("/cleanup")
113+
.build())
114+
.setSpanBuilder(spanBuilderFactory.get())
115+
.build();
116+
117+
StatusResponse response = httpClient.execute(request, createStatusResponseHandler());
118+
if (response.getStatusCode() / 100 != 2) {
119+
log.warn("Failed to cleanup task %s: %s", taskId, response.getStatusCode());
120+
}
121+
});
122+
}
123+
}

core/trino-main/src/main/java/io/trino/server/remotetask/TaskInfoFetcher.java

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class TaskInfoFetcher
6767
private final TaskId taskId;
6868
private final Consumer<Throwable> onFail;
6969
private final ContinuousTaskStatusFetcher taskStatusFetcher;
70+
private final RemoteTaskCleaner remoteTaskCleaner;
7071
private final StateMachine<TaskInfo> taskInfo;
7172
private final StateMachine<Optional<TaskInfo>> finalTaskInfo;
7273
private final JsonCodec<TaskInfo> taskInfoCodec;
@@ -100,6 +101,7 @@ public class TaskInfoFetcher
100101
public TaskInfoFetcher(
101102
Consumer<Throwable> onFail,
102103
ContinuousTaskStatusFetcher taskStatusFetcher,
104+
RemoteTaskCleaner remoteTaskCleaner,
103105
TaskInfo initialTask,
104106
HttpClient httpClient,
105107
Supplier<SpanBuilder> spanBuilderFactory,
@@ -120,6 +122,7 @@ public TaskInfoFetcher(
120122
this.taskId = initialTask.taskStatus().getTaskId();
121123
this.onFail = requireNonNull(onFail, "onFail is null");
122124
this.taskStatusFetcher = requireNonNull(taskStatusFetcher, "taskStatusFetcher is null");
125+
this.remoteTaskCleaner = requireNonNull(remoteTaskCleaner, "remoteTaskCleaner is null");
123126
this.taskInfo = new StateMachine<>("task " + taskId, executor, initialTask);
124127
this.finalTaskInfo = new StateMachine<>("task-" + taskId, executor, Optional.empty());
125128
this.taskInfoCodec = requireNonNull(taskInfoCodec, "taskInfoCodec is null");
@@ -163,6 +166,7 @@ private synchronized void stop()
163166
if (scheduledFuture != null) {
164167
scheduledFuture.cancel(true);
165168
}
169+
remoteTaskCleaner.markTaskInfoFetcherStopped();
166170
}
167171

168172
/**

0 commit comments

Comments
 (0)