Skip to content

Commit dda5853

Browse files
authored
apache/master->master: 31c15b7 (#1262)
2 parents b157889 + 31c15b7 commit dda5853

7 files changed

Lines changed: 114 additions & 11 deletions

File tree

docs/configuration/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1362,6 +1362,7 @@ Middle Managers pass their configurations down to their child peons. The Middle
13621362
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. For example, `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
13631363
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir. This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks. The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`|
13641364
|`druid.worker.category`|A string to name the category that the Middle Manager node belongs to.|`_default_worker_category`|
1365+
|`druid.worker.startAlwaysEnabled`|If true, the Middle Manager always starts in the enabled state. If false, a disabled state set via the worker disable API is persisted and restored across restarts.|`false`|
13651366
|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when [Centralized Datasource Schema](#centralized-datasource-schema-experimental) feature is enabled. |false|
13661367

13671368
#### Peon processing
@@ -1478,6 +1479,7 @@ For most types of tasks, `SegmentWriteOutMediumFactory` can be configured per-ta
14781479
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir. This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks. The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`|
14791480
|`druid.worker.globalIngestionHeapLimitBytes`|Total amount of heap available for ingestion processing. This is applied by automatically setting the `maxBytesInMemory` property on tasks.|Configured max JVM heap size / 6|
14801481
|`druid.worker.numConcurrentMerges`|Maximum number of segment persist or merge operations that can run concurrently across all tasks.|`druid.worker.capacity` / 2, rounded down|
1482+
|`druid.worker.startAlwaysEnabled`|If true, the Indexer always starts in the enabled state. If false, a disabled state set via the worker disable API is persisted and restored across restarts.|`false`|
14811483
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
14821484
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
14831485
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer restart for restorable tasks to gracefully exit.|`PT5M`|

indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.druid.indexing.common.task.Task;
4040
import org.apache.druid.indexing.overlord.TaskRunner;
4141
import org.apache.druid.indexing.overlord.TaskRunnerListener;
42+
import org.apache.druid.indexing.worker.config.WorkerConfig;
4243
import org.apache.druid.java.util.common.Either;
4344
import org.apache.druid.java.util.common.FileUtils;
4445
import org.apache.druid.java.util.common.ISE;
@@ -110,10 +111,16 @@ public class WorkerTaskManager implements IndexerTaskCountStatsProvider
110111
/**
111112
* Whether this worker is disabled (i.e., not accepting new tasks). Persisted to
112113
* {@link #STATE_FILE} under {@link #storageDir} so the flag survives
113-
* process restarts.
114+
* process restarts, unless {@link #startAlwaysEnabled} is true.
114115
*/
115116
private final AtomicBoolean disabled = new AtomicBoolean(false);
116117

118+
/**
119+
* When true, {@link #STATE_FILE} is deleted (rather than read) on startup,
120+
* and the worker starts enabled. Controlled by {@code druid.worker.startAlwaysEnabled}.
121+
*/
122+
private final boolean startAlwaysEnabled;
123+
117124
private final OverlordClient overlordClient;
118125
private final File storageDir;
119126

@@ -122,6 +129,7 @@ public WorkerTaskManager(
122129
ObjectMapper jsonMapper,
123130
TaskRunner taskRunner,
124131
TaskConfig taskConfig,
132+
WorkerConfig workerConfig,
125133
OverlordClient overlordClient
126134
)
127135
{
@@ -130,6 +138,7 @@ public WorkerTaskManager(
130138
this.exec = Execs.singleThreaded("WorkerTaskManager-NoticeHandler");
131139
this.completedTasksCleanupExecutor = Execs.scheduledSingleThreaded("WorkerTaskManager-CompletedTasksCleaner");
132140
this.overlordClient = overlordClient;
141+
this.startAlwaysEnabled = workerConfig.isStartAlwaysEnabled();
133142

134143
storageDir = taskConfig.getBaseTaskDir();
135144
}
@@ -357,18 +366,28 @@ public File getStateFile()
357366
}
358367

359368
/**
360-
* Read {@link #STATE_FILE} and initialize {@link #disabled}.
369+
* Read {@link #STATE_FILE} and initialize {@link #disabled}. When {@link #startAlwaysEnabled}
370+
* is true, delete the file (if present) instead and leave {@link #disabled} at its default.
361371
*/
362372
private void loadStateFile()
363373
{
364374
final File stateFile = getStateFile();
365375
if (stateFile.exists()) {
366-
try {
367-
final WorkerState state = jsonMapper.readValue(stateFile, WorkerState.class);
368-
disabled.set(state.disabled());
369-
}
370-
catch (Exception e) {
371-
log.warn(e, "Failed to read disabled state from[%s]. Starting as enabled.", stateFile);
376+
if (startAlwaysEnabled) {
377+
try {
378+
Files.delete(stateFile.toPath());
379+
}
380+
catch (IOException e) {
381+
log.warn(e, "Failed to delete state file[%s].", stateFile);
382+
}
383+
} else {
384+
try {
385+
final WorkerState state = jsonMapper.readValue(stateFile, WorkerState.class);
386+
disabled.set(state.disabled());
387+
}
388+
catch (Exception e) {
389+
log.warn(e, "Failed to read state file[%s]. Starting as enabled.", stateFile);
390+
}
372391
}
373392
}
374393
}

indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.druid.indexing.common.config.TaskConfig;
3333
import org.apache.druid.indexing.common.task.Task;
3434
import org.apache.druid.indexing.overlord.TaskRunner;
35+
import org.apache.druid.indexing.worker.config.WorkerConfig;
3536
import org.apache.druid.java.util.common.concurrent.Execs;
3637
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
3738
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -63,12 +64,13 @@ public WorkerTaskMonitor(
6364
ObjectMapper jsonMapper,
6465
TaskRunner taskRunner,
6566
TaskConfig taskConfig,
67+
WorkerConfig workerConfig,
6668
CuratorFramework cf,
6769
WorkerCuratorCoordinator workerCuratorCoordinator,
6870
OverlordClient overlordClient
6971
)
7072
{
71-
super(jsonMapper, taskRunner, taskConfig, overlordClient);
73+
super(jsonMapper, taskRunner, taskConfig, workerConfig, overlordClient);
7274

7375
this.jsonMapper = jsonMapper;
7476
this.pathChildrenCache = new PathChildrenCache(

indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.druid.indexing.common.task.Tasks;
4343
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
4444
import org.apache.druid.indexing.overlord.TestTaskRunner;
45+
import org.apache.druid.indexing.worker.config.WorkerConfig;
4546
import org.apache.druid.java.util.common.FileUtils;
4647
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
4748
import org.apache.druid.query.policy.NoopPolicyEnforcer;
@@ -115,10 +116,15 @@ public static Collection<Object[]> getParameters()
115116

116117
private WorkerTaskManager createWorkerTaskManager()
117118
{
118-
return createWorkerTaskManager(FileUtils.createTempDir());
119+
return createWorkerTaskManager(FileUtils.createTempDir(), new WorkerConfig());
119120
}
120121

121122
private WorkerTaskManager createWorkerTaskManager(File baseDir)
123+
{
124+
return createWorkerTaskManager(baseDir, new WorkerConfig());
125+
}
126+
127+
private WorkerTaskManager createWorkerTaskManager(File baseDir, WorkerConfig workerConfig)
122128
{
123129
TaskConfig taskConfig = new TaskConfigBuilder()
124130
.setBaseDir(baseDir.toString())
@@ -185,6 +191,7 @@ private WorkerTaskManager createWorkerTaskManager(File baseDir)
185191
location
186192
),
187193
taskConfig,
194+
workerConfig,
188195
overlordClient
189196
)
190197
{
@@ -626,4 +633,59 @@ public void test_disabledState_malformedFileToleratedAndStartsEnabled() throws E
626633
workerTaskManager.start();
627634
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
628635
}
636+
637+
@Test
638+
public void test_startAlwaysEnabled_ignoresAndDeletesPersistedDisabledState() throws Exception
639+
{
640+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
641+
EasyMock.replay(overlordClient);
642+
643+
final File baseTaskDir = FileUtils.createTempDir();
644+
645+
workerTaskManager = createWorkerTaskManager(baseTaskDir);
646+
workerTaskManager.start();
647+
workerTaskManager.workerDisabled();
648+
Assert.assertFalse(workerTaskManager.isWorkerEnabled());
649+
Assert.assertTrue(workerTaskManager.getStateFile().exists());
650+
workerTaskManager.stop();
651+
652+
final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
653+
.setStartAlwaysEnabled(true)
654+
.build();
655+
workerTaskManager = createWorkerTaskManager(baseTaskDir, workerConfig);
656+
workerTaskManager.start();
657+
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
658+
Assert.assertFalse(workerTaskManager.getStateFile().exists());
659+
}
660+
661+
@Test
662+
public void test_startAlwaysEnabled_doesNotCreateStateFileWhenAbsent() throws Exception
663+
{
664+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
665+
EasyMock.replay(overlordClient);
666+
667+
final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
668+
.setStartAlwaysEnabled(true)
669+
.build();
670+
workerTaskManager = createWorkerTaskManager(FileUtils.createTempDir(), workerConfig);
671+
workerTaskManager.start();
672+
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
673+
Assert.assertFalse(workerTaskManager.getStateFile().exists());
674+
}
675+
676+
@Test
677+
public void test_startAlwaysEnabled_runtimeDisableStillPersistsToStateFile() throws Exception
678+
{
679+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
680+
EasyMock.replay(overlordClient);
681+
682+
final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
683+
.setStartAlwaysEnabled(true)
684+
.build();
685+
workerTaskManager = createWorkerTaskManager(FileUtils.createTempDir(), workerConfig);
686+
workerTaskManager.start();
687+
workerTaskManager.workerDisabled();
688+
Assert.assertFalse(workerTaskManager.isWorkerEnabled());
689+
Assert.assertTrue(workerTaskManager.getStateFile().exists());
690+
}
629691
}

indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ private WorkerTaskMonitor createTaskMonitor()
226226
new ServerConfig()
227227
),
228228
taskConfig,
229+
new WorkerConfig(),
229230
cf,
230231
workerCuratorCoordinator,
231232
EasyMock.createNiceMock(OverlordClient.class)

multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void postResultsComplete(
8181
) throws IOException;
8282

8383
/**
84-
* Client side method to inform the controller that the error has occured in the given worker.
84+
* Client side method to inform the controller that the error has occurred in the given worker.
8585
*/
8686
void postWorkerError(MSQErrorReport errorWrapper) throws IOException;
8787

server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public class WorkerConfig
6868
@JsonProperty
6969
private int numConcurrentMerges = Math.max(1, capacity / 2);
7070

71+
@JsonProperty
72+
private boolean startAlwaysEnabled = false;
73+
7174
public String getIp()
7275
{
7376
return ip;
@@ -130,6 +133,11 @@ public int getNumConcurrentMerges()
130133
return numConcurrentMerges;
131134
}
132135

136+
public boolean isStartAlwaysEnabled()
137+
{
138+
return startAlwaysEnabled;
139+
}
140+
133141
public Builder cloneBuilder()
134142
{
135143
return new Builder(this);
@@ -148,6 +156,7 @@ public static class Builder
148156
private Period intermediaryPartitionTimeout;
149157
private long globalIngestionHeapLimitBytes;
150158
private int numConcurrentMerges;
159+
private boolean startAlwaysEnabled;
151160

152161
private Builder(WorkerConfig input)
153162
{
@@ -162,6 +171,7 @@ private Builder(WorkerConfig input)
162171
this.intermediaryPartitionTimeout = input.intermediaryPartitionTimeout;
163172
this.globalIngestionHeapLimitBytes = input.globalIngestionHeapLimitBytes;
164173
this.numConcurrentMerges = input.numConcurrentMerges;
174+
this.startAlwaysEnabled = input.startAlwaysEnabled;
165175
}
166176

167177
public Builder setIp(String ip)
@@ -230,6 +240,12 @@ public Builder setNumConcurrentMerges(int numConcurrentMerges)
230240
return this;
231241
}
232242

243+
public Builder setStartAlwaysEnabled(boolean startAlwaysEnabled)
244+
{
245+
this.startAlwaysEnabled = startAlwaysEnabled;
246+
return this;
247+
}
248+
233249
public WorkerConfig build()
234250
{
235251
final WorkerConfig retVal = new WorkerConfig();
@@ -244,6 +260,7 @@ public WorkerConfig build()
244260
retVal.intermediaryPartitionTimeout = this.intermediaryPartitionTimeout;
245261
retVal.globalIngestionHeapLimitBytes = this.globalIngestionHeapLimitBytes;
246262
retVal.numConcurrentMerges = this.numConcurrentMerges;
263+
retVal.startAlwaysEnabled = this.startAlwaysEnabled;
247264
return retVal;
248265
}
249266
}

0 commit comments

Comments
 (0)