Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.worker;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -83,6 +84,7 @@
*/
public class WorkerTaskManager implements IndexerTaskCountStatsProvider
{
public static final String STATE_FILE = "workerState.json";
private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class);

private final ObjectMapper jsonMapper;
Expand All @@ -105,6 +107,11 @@ public class WorkerTaskManager implements IndexerTaskCountStatsProvider

private final ScheduledExecutorService completedTasksCleanupExecutor;

/**
* Whether this worker is disabled (i.e., not accepting new tasks). Persisted to
* {@link #STATE_FILE} under {@link #storageDir} so the flag survives
* process restarts.
*/
private final AtomicBoolean disabled = new AtomicBoolean(false);

private final OverlordClient overlordClient;
Expand Down Expand Up @@ -138,6 +145,7 @@ public void start() throws Exception
try {
log.debug("Starting...");
cleanupAndMakeTmpTaskDir();
loadStateFile();
registerLocationListener();
restoreRestorableTasks();
initAssignedTasks();
Expand Down Expand Up @@ -340,6 +348,51 @@ public File getAssignedTaskDir()
return new File(storageDir, "assignedTasks");
}

/**
* Full path to {@link #STATE_FILE}.
*/
public File getStateFile()
{
return new File(storageDir, STATE_FILE);
}

/**
* Read {@link #STATE_FILE} and initialize {@link #disabled}.
*/
private void loadStateFile()
{
final File stateFile = getStateFile();
if (stateFile.exists()) {
try {
final WorkerState state = jsonMapper.readValue(stateFile, WorkerState.class);
disabled.set(state.disabled());
}
catch (Exception e) {
log.warn(e, "Failed to read disabled state from[%s]. Starting as enabled.", stateFile);
}
}
}

/**
* Write {@link #disabled} to {@link #STATE_FILE}.
*/
private void writeStateFile()
{
final File stateFile = getStateFile();
try {
FileUtils.writeAtomically(
stateFile,
out -> {
jsonMapper.writeValue(out, new WorkerState(disabled.get()));
return null;
}
);
}
catch (Exception e) {
log.warn(e, "Failed to persist state file[%s].", stateFile);
}
}

private void initAssignedTasks() throws IOException
{
File assignedTaskDir = getAssignedTaskDir();
Expand Down Expand Up @@ -517,18 +570,20 @@ private void scheduleCompletedTasksCleanup()
public void workerEnabled()
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started");

if (disabled.compareAndSet(true, false)) {
changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(false));
}
setDisabled(false);
}

public void workerDisabled()
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started");
setDisabled(true);
}

if (disabled.compareAndSet(false, true)) {
changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(true));
private void setDisabled(boolean newValue)
{
if (disabled.compareAndSet(!newValue, newValue)) {
changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(newValue));
writeStateFile();
}
}

Expand Down Expand Up @@ -656,6 +711,10 @@ public Map<String, Long> getWorkerSuccessfulTasks()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
}

record WorkerState(@JsonProperty("disabled") boolean disabled)
{
}

private static class TaskDetails
{
private final Task task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -113,9 +114,14 @@ public static Collection<Object[]> getParameters()
}

private WorkerTaskManager createWorkerTaskManager()
{
return createWorkerTaskManager(FileUtils.createTempDir());
}

private WorkerTaskManager createWorkerTaskManager(File baseDir)
{
TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(FileUtils.createTempDir().toString())
.setBaseDir(baseDir.toString())
.setRestoreTasksOnRestart(restoreTasksOnRestart)
.build();

Expand Down Expand Up @@ -550,4 +556,74 @@ public void getWorkerTaskStatsTest() throws Exception
));
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
}

@Test
public void test_disabledState_persistsAcrossRestart() throws Exception
{
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
EasyMock.replay(overlordClient);

final File baseTaskDir = FileUtils.createTempDir();

workerTaskManager = createWorkerTaskManager(baseTaskDir);
workerTaskManager.start();
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
workerTaskManager.workerDisabled();
Assert.assertFalse(workerTaskManager.isWorkerEnabled());
Assert.assertTrue(workerTaskManager.getStateFile().exists());
workerTaskManager.stop();

workerTaskManager = createWorkerTaskManager(baseTaskDir);
workerTaskManager.start();
Assert.assertFalse(workerTaskManager.isWorkerEnabled());

final ChangeRequestsSnapshot<WorkerHistoryItem> history =
workerTaskManager.getChangesSince(new ChangeRequestHistory.Counter(-1, 0)).get();
Assert.assertTrue(((WorkerHistoryItem.Metadata) history.getRequests().get(0)).isDisabled());
}

@Test
public void test_disabledState_reEnablePersistsAcrossRestart() throws Exception
{
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
EasyMock.replay(overlordClient);

final File baseTaskDir = FileUtils.createTempDir();

workerTaskManager = createWorkerTaskManager(baseTaskDir);
workerTaskManager.start();
workerTaskManager.workerDisabled();
workerTaskManager.workerEnabled();
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
workerTaskManager.stop();

workerTaskManager = createWorkerTaskManager(baseTaskDir);
workerTaskManager.start();
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
}

@Test
public void test_disabledState_defaultsToEnabledWhenNoFile() throws Exception
{
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
EasyMock.replay(overlordClient);

workerTaskManager.start();
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
}

@Test
public void test_disabledState_malformedFileToleratedAndStartsEnabled() throws Exception
{
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
EasyMock.replay(overlordClient);

workerTaskManager = createWorkerTaskManager();
final File stateFile = workerTaskManager.getStateFile();
FileUtils.mkdirp(stateFile.getParentFile());
Files.write(stateFile.toPath(), "not valid json".getBytes(StandardCharsets.UTF_8));

workerTaskManager.start();
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
}
}
Loading