|
1 | 1 | package com.conveyal.analysis.components.broker; |
2 | 2 |
|
3 | | -import com.conveyal.analysis.AnalysisServerException; |
4 | 3 | import com.conveyal.analysis.components.Component; |
5 | 4 | import com.conveyal.analysis.components.WorkerLauncher; |
6 | 5 | import com.conveyal.analysis.components.eventbus.ErrorEvent; |
@@ -108,7 +107,8 @@ public interface Config { |
108 | 107 | * Used when auto-starting spot instances. Set to a smaller value to increase the number of |
109 | 108 | * workers requested automatically |
110 | 109 | */ |
111 | | - public final int TARGET_TASKS_PER_WORKER = 800; |
| 110 | + public final int TARGET_TASKS_PER_WORKER_TRANSIT = 800; |
| 111 | + public final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000; |
112 | 112 |
|
113 | 113 | /** |
114 | 114 | * We want to request spot instances to "boost" regional analyses after a few regional task |
@@ -243,28 +243,54 @@ public void createOnDemandWorkerInCategory(WorkerCategory category, WorkerTags w |
243 | 243 | /** |
244 | 244 | * Create on-demand/spot workers for a given job, after certain checks |
245 | 245 | * @param nOnDemand EC2 on-demand instances to request |
246 | | - * @param nSpot EC2 spot instances to request |
| 246 | + * @param nSpot Target number of EC2 spot instances to request. The actual number requested may be lower if the |
| 247 | + * total number of workers running is approaching the maximum specified in the Broker config. |
247 | 248 | */ |
248 | 249 | public void createWorkersInCategory (WorkerCategory category, WorkerTags workerTags, int nOnDemand, int nSpot) { |
249 | 250 |
|
| 251 | + // Log error messages rather than throwing exceptions, as this code often runs in worker poll handlers. |
| 252 | + // Throwing an exception there would not report any useful information to anyone. |
| 253 | + |
250 | 254 | if (config.offline()) { |
251 | | - LOG.info("Work offline enabled, not creating workers for {}", category); |
| 255 | + LOG.info("Work offline enabled, not creating workers for {}.", category); |
| 256 | + return; |
| 257 | + } |
| 258 | + |
| 259 | + if (nOnDemand < 0 || nSpot < 0) { |
| 260 | + LOG.error("Negative number of workers requested, not starting any."); |
| 261 | + return; |
| 262 | + } |
| 263 | + |
| 264 | + final int nRequested = nOnDemand + nSpot; |
| 265 | + if (nRequested <= 0) { |
| 266 | + LOG.error("No workers requested, not starting any."); |
252 | 267 | return; |
253 | 268 | } |
254 | 269 |
|
255 | | - if (nOnDemand < 0 || nSpot < 0){ |
256 | | - LOG.info("Negative number of workers requested, not starting any"); |
| 270 | + // Zeno's worker pool management: never start more than half the remaining capacity. |
| 271 | + final int remainingCapacity = config.maxWorkers() - workerCatalog.totalWorkerCount(); |
| 272 | + final int maxToStart = remainingCapacity / 2; |
| 273 | + if (maxToStart <= 0) { |
| 274 | + LOG.error("Due to capacity limiting, not starting any workers."); |
257 | 275 | return; |
258 | 276 | } |
259 | 277 |
|
260 | | - if (workerCatalog.totalWorkerCount() + nOnDemand + nSpot >= config.maxWorkers()) { |
261 | | - String message = String.format( |
262 | | - "Maximum of %d workers already started, not starting more;" + |
263 | | - "jobs will not complete on %s", |
264 | | - config.maxWorkers(), |
265 | | - category |
| 278 | + if (nRequested > maxToStart) { |
| 279 | + LOG.warn("Request for {} workers is more than half the remaining worker pool capacity.", nRequested); |
| 280 | + nSpot = maxToStart; |
| 281 | + nOnDemand = 0; |
| 282 | + LOG.warn("Lowered to {} on-demand and {} spot workers.", nOnDemand, nSpot); |
| 283 | + } |
| 284 | + |
| 285 | + // Just an assertion for consistent state - this should never happen. |
| 286 | + // Re-sum nOnDemand + nSpot here instead of using nTotal, as they may have been revised. |
| 287 | + if (workerCatalog.totalWorkerCount() + nOnDemand + nSpot > config.maxWorkers()) { |
| 288 | + LOG.error( |
| 289 | + "Starting workers would exceed the maximum capacity of {}. Jobs may stall on {}.", |
| 290 | + config.maxWorkers(), |
| 291 | + category |
266 | 292 | ); |
267 | | - throw AnalysisServerException.forbidden(message); |
| 293 | + return; |
268 | 294 | } |
269 | 295 |
|
270 | 296 | // If workers have already been started up, don't repeat the operation. |
@@ -483,9 +509,27 @@ private void requestExtraWorkersIfAppropriate(Job job) { |
483 | 509 | WorkerCategory workerCategory = job.workerCategory; |
484 | 510 | int categoryWorkersAlreadyRunning = workerCatalog.countWorkersInCategory(workerCategory); |
485 | 511 | if (categoryWorkersAlreadyRunning < MAX_WORKERS_PER_CATEGORY) { |
486 | | - // Start a number of workers that scales with the number of total tasks, up to a fixed number. |
487 | | - // TODO more refined determination of number of workers to start (e.g. using tasks per minute) |
488 | | - int targetWorkerTotal = Math.min(MAX_WORKERS_PER_CATEGORY, job.nTasksTotal / TARGET_TASKS_PER_WORKER); |
| 512 | + // TODO more refined determination of number of workers to start (e.g. using observed tasks per minute |
| 513 | + // for recently completed tasks -- but what about when initial origins are in a desert/ocean?) |
| 514 | + int targetWorkerTotal; |
| 515 | + if (job.templateTask.hasTransit()) { |
| 516 | + // Total computation for a task with transit depends on the number of stops and whether the |
| 517 | + // network has frequency-based routes. The total computation for the job depends on these |
| 518 | + // factors as well as the number of tasks (origins). Zoom levels add a complication: the number of |
| 519 | + // origins becomes an even poorer proxy for the number of stops. We use a scale factor to compensate |
| 520 | + // -- all else equal, high zoom levels imply fewer stops per origin (task) and a lower ideal target |
| 521 | + // for number of workers. TODO reduce scale factor further when there are no frequency routes. But is |
| 522 | + // this worth adding a field to Job or RegionalTask? |
| 523 | + float transitScaleFactor = (9f / job.templateTask.zoom); |
| 524 | + targetWorkerTotal = (int) ((job.nTasksTotal / TARGET_TASKS_PER_WORKER_TRANSIT) * transitScaleFactor); |
| 525 | + } else { |
| 526 | + // Tasks without transit are simpler. They complete relatively quickly, and the total computation for |
| 527 | + // the job increases roughly with linearly with the number of origins. |
| 528 | + targetWorkerTotal = job.nTasksTotal / TARGET_TASKS_PER_WORKER_NONTRANSIT; |
| 529 | + } |
| 530 | + |
| 531 | + // Do not exceed the limit on workers per category TODO add similar limit per accessGroup or user |
| 532 | + targetWorkerTotal = Math.min(targetWorkerTotal, MAX_WORKERS_PER_CATEGORY); |
489 | 533 | // Guardrail until freeform pointsets are tested more thoroughly |
490 | 534 | if (job.templateTask.originPointSet != null) targetWorkerTotal = Math.min(targetWorkerTotal, 5); |
491 | 535 | int nSpot = targetWorkerTotal - categoryWorkersAlreadyRunning; |
|
0 commit comments