Skip to content

Commit f53df35

Browse files
Refactor the results assemblers
Refactor the multi-origin assembler and the individual result assembler classes to allow them to be created during the regional analysis creation process so that the Broker and MultiOriginAssembler can be simpler and depend on fewer components.
1 parent c6bcd28 commit f53df35

16 files changed

+251
-406
lines changed

src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public LocalBackendComponents () {
3434
authentication = new LocalAuthentication();
3535
// TODO add nested LocalWorkerComponents here, to reuse some components, and pass it into the LocalWorkerLauncher?
3636
workerLauncher = new LocalWorkerLauncher(config, fileStorage, gtfsCache, osmCache);
37-
broker = new Broker(config, fileStorage, eventBus, workerLauncher);
37+
broker = new Broker(config, eventBus, workerLauncher);
3838
censusExtractor = new SeamlessCensusGridExtractor(config);
3939
// Instantiate the HttpControllers last, when all the components except the HttpApi are already created.
4040
List<HttpController> httpControllers = standardHttpControllers();

src/main/java/com/conveyal/analysis/components/broker/Broker.java

+9-74
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,11 @@
66
import com.conveyal.analysis.components.eventbus.EventBus;
77
import com.conveyal.analysis.components.eventbus.RegionalAnalysisEvent;
88
import com.conveyal.analysis.components.eventbus.WorkerEvent;
9-
import com.conveyal.analysis.models.RegionalAnalysis;
109
import com.conveyal.analysis.results.MultiOriginAssembler;
11-
import com.conveyal.analysis.util.JsonUtil;
12-
import com.conveyal.file.FileStorage;
13-
import com.conveyal.file.FileStorageKey;
14-
import com.conveyal.file.FileUtils;
1510
import com.conveyal.r5.analyst.WorkerCategory;
1611
import com.conveyal.r5.analyst.cluster.RegionalTask;
1712
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
1813
import com.conveyal.r5.analyst.cluster.WorkerStatus;
19-
import com.conveyal.r5.analyst.scenario.Scenario;
2014
import com.conveyal.r5.util.ExceptionUtils;
2115
import com.google.common.collect.ListMultimap;
2216
import com.google.common.collect.MultimapBuilder;
@@ -27,8 +21,6 @@
2721
import org.slf4j.Logger;
2822
import org.slf4j.LoggerFactory;
2923

30-
import java.io.File;
31-
import java.io.IOException;
3224
import java.util.ArrayList;
3325
import java.util.Collection;
3426
import java.util.Collections;
@@ -42,7 +34,6 @@
4234
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Action.REQUESTED;
4335
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.REGIONAL;
4436
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.SINGLE_POINT;
45-
import static com.conveyal.file.FileCategory.BUNDLES;
4637
import static com.google.common.base.Preconditions.checkNotNull;
4738

4839
/**
@@ -93,7 +84,6 @@ public interface Config {
9384
private Config config;
9485

9586
// Component Dependencies
96-
private final FileStorage fileStorage;
9787
private final EventBus eventBus;
9888
private final WorkerLauncher workerLauncher;
9989

@@ -143,9 +133,8 @@ public interface Config {
143133
public TObjectLongMap<WorkerCategory> recentlyRequestedWorkers =
144134
TCollections.synchronizedMap(new TObjectLongHashMap<>());
145135

146-
public Broker (Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) {
136+
public Broker (Config config, EventBus eventBus, WorkerLauncher workerLauncher) {
147137
this.config = config;
148-
this.fileStorage = fileStorage;
149138
this.eventBus = eventBus;
150139
this.workerLauncher = workerLauncher;
151140
}
@@ -154,83 +143,29 @@ public Broker (Config config, FileStorage fileStorage, EventBus eventBus, Worker
154143
* Enqueue a set of tasks for a regional analysis.
155144
* Only a single task is passed in, which the broker will expand into all the individual tasks for a regional job.
156145
*/
157-
public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAnalysis) {
158-
159-
// Make a copy of the regional task inside the RegionalAnalysis, replacing the scenario with a scenario ID.
160-
RegionalTask templateTask = templateTaskFromRegionalAnalysis(regionalAnalysis);
161-
162-
LOG.info("Enqueuing tasks for job {} using template task.", templateTask.jobId);
163-
if (findJob(templateTask.jobId) != null) {
164-
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
165-
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
146+
public synchronized void enqueueTasksForRegionalJob (Job job, MultiOriginAssembler assembler) {
147+
LOG.info("Enqueuing tasks for job {} using template task.", job.jobId);
148+
if (findJob(job.jobId) != null) {
149+
LOG.error("Someone tried to enqueue job {} but it already exists.", job.jobId);
150+
throw new RuntimeException("Enqueued duplicate job " + job.jobId);
166151
}
167-
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
168-
Job job = new Job(templateTask, workerTags);
169152
jobs.put(job.workerCategory, job);
170153

171154
// Register the regional job so results received from multiple workers can be assembled into one file.
172-
// TODO encapsulate MultiOriginAssemblers in a new Component
173-
// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
174-
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
175-
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
176-
resultAssemblers.put(templateTask.jobId, assembler);
155+
resultAssemblers.put(job.jobId, assembler);
177156

178157
if (config.testTaskRedelivery()) {
179158
// This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
180159
return;
181160
}
182161

183162
if (workerCatalog.noWorkersAvailable(job.workerCategory, config.offline())) {
184-
createOnDemandWorkerInCategory(job.workerCategory, workerTags);
163+
createOnDemandWorkerInCategory(job.workerCategory, job.workerTags);
185164
} else {
186165
// Workers exist in this category, clear out any record that we're waiting for one to start up.
187166
recentlyRequestedWorkers.remove(job.workerCategory);
188167
}
189-
eventBus.send(new RegionalAnalysisEvent(templateTask.jobId, STARTED).forUser(workerTags.user, workerTags.group));
190-
}
191-
192-
/**
193-
* The single RegionalTask object represents a lot of individual accessibility tasks at many different origin
194-
* points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to
195-
* workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID
196-
* to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task,
197-
* as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on
198-
* its ID only. We protectively clone this task because we're going to null out its scenario field, and don't
199-
* want to affect the original object which contains all the scenario details.
200-
* TODO Why is all this detail added after the Persistence call?
201-
* We don't want to store all the details added below in Mongo?
202-
*/
203-
private RegionalTask templateTaskFromRegionalAnalysis (RegionalAnalysis regionalAnalysis) {
204-
RegionalTask templateTask = regionalAnalysis.request.clone();
205-
// First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers.
206-
Scenario scenario = templateTask.scenario;
207-
templateTask.scenarioId = scenario.id;
208-
// Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON.
209-
templateTask.scenario = null;
210-
String fileName = String.format("%s_%s.json", regionalAnalysis.bundleId, scenario.id);
211-
FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName);
212-
try {
213-
File localScenario = FileUtils.createScratchFile("json");
214-
JsonUtil.objectMapper.writeValue(localScenario, scenario);
215-
// FIXME this is using a network service in a method called from a synchronized broker method.
216-
// Move file into storage before entering the synchronized block.
217-
fileStorage.moveIntoStorage(fileStorageKey, localScenario);
218-
} catch (IOException e) {
219-
LOG.error("Error storing scenario for retrieval by workers.", e);
220-
}
221-
// Fill in all the fields in the template task that will remain the same across all tasks in a job.
222-
// I am not sure why we are re-setting all these fields, it seems like they are already set when the task is
223-
// initialized by AnalysisRequest.populateTask. But we'd want to thoroughly check that assumption before
224-
// eliminating or moving these lines.
225-
templateTask.jobId = regionalAnalysis._id;
226-
templateTask.graphId = regionalAnalysis.bundleId;
227-
templateTask.workerVersion = regionalAnalysis.workerVersion;
228-
templateTask.height = regionalAnalysis.height;
229-
templateTask.width = regionalAnalysis.width;
230-
templateTask.north = regionalAnalysis.north;
231-
templateTask.west = regionalAnalysis.west;
232-
templateTask.zoom = regionalAnalysis.zoom;
233-
return templateTask;
168+
eventBus.send(new RegionalAnalysisEvent(job.jobId, STARTED).forUser(job.workerTags.user, job.workerTags.group));
234169
}
235170

236171
/**

src/main/java/com/conveyal/analysis/components/broker/Job.java

+25-18
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import java.util.Set;
1414

1515
import static com.conveyal.r5.common.Util.notNullOrEmpty;
16-
import static com.google.common.base.Preconditions.checkNotNull;
1716

1817
/**
1918
* A Job is a collection of tasks that represent all the origins in a regional analysis. All the
@@ -61,13 +60,13 @@ public class Job {
6160
* The number of remaining tasks can be derived from the deliveredTasks BitSet, but as an
6261
* optimization we keep a separate counter to avoid constantly scanning over that whole bitset.
6362
*/
64-
protected int nTasksCompleted;
63+
protected int nTasksCompleted = 0;
6564

6665
/**
6766
* The total number of task deliveries that have occurred. A task may be counted more than
6867
* once if it is redelivered.
6968
*/
70-
protected int nTasksDelivered;
69+
protected int nTasksDelivered = 0;
7170

7271
/** Every task in this job will be based on this template task, but have its origin coordinates changed. */
7372
public final RegionalTask templateTask;
@@ -128,23 +127,31 @@ private RegionalTask makeOneTask (int taskNumber) {
128127
*/
129128
public final Set<String> errors = new HashSet();
130129

131-
public Job (RegionalTask templateTask, WorkerTags workerTags) {
132-
this.jobId = templateTask.jobId;
133-
this.templateTask = templateTask;
134-
this.workerCategory = new WorkerCategory(templateTask.graphId, templateTask.workerVersion);
135-
this.nTasksCompleted = 0;
136-
this.nextTaskToDeliver = 0;
137-
138-
if (templateTask.originPointSetKey != null) {
139-
checkNotNull(templateTask.originPointSet);
140-
this.nTasksTotal = templateTask.originPointSet.featureCount();
141-
} else {
142-
this.nTasksTotal = templateTask.width * templateTask.height;
143-
}
144-
145-
this.completedTasks = new BitSet(nTasksTotal);
130+
public Job (RegionalTask task, WorkerTags workerTags) {
131+
templateTask = templateTaskFromRegionalTask(task);
132+
jobId = task.jobId;
133+
workerCategory = new WorkerCategory(task.graphId, task.workerVersion);
134+
nTasksTotal = task.getTasksTotal();
135+
completedTasks = new BitSet(nTasksTotal);
146136
this.workerTags = workerTags;
137+
}
147138

139+
/**
140+
* The single RegionalTask object represents a lot of individual accessibility tasks at many different origin
141+
* points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to
142+
* workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID
143+
* to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task,
144+
* as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on
145+
* its ID only. We protectively clone this task because we're going to null out its scenario field, and don't
146+
* want to affect the original object which contains all the scenario details.
147+
*/
148+
private static RegionalTask templateTaskFromRegionalTask(RegionalTask task) {
149+
RegionalTask templateTask = task.clone();
150+
// First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers.
151+
templateTask.scenarioId = templateTask.scenario.id;
152+
// Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON.
153+
templateTask.scenario = null;
154+
return templateTask;
148155
}
149156

150157
public boolean markTaskCompleted(int taskId) {

src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import com.conveyal.analysis.components.BackendComponents;
44
import com.conveyal.analysis.components.LocalBackendComponents;
55
import com.conveyal.analysis.models.RegionalAnalysis;
6+
import com.conveyal.analysis.results.MultiOriginAssembler;
67
import com.conveyal.r5.analyst.cluster.RegionalTask;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
910

1011
import java.nio.ByteBuffer;
12+
import java.util.ArrayList;
1113
import java.util.UUID;
1214

1315
/**
@@ -66,7 +68,9 @@ private static void sendFakeJob(Broker broker) {
6668
templateTask.scenarioId = "FAKE";
6769
RegionalAnalysis regionalAnalysis = new RegionalAnalysis();
6870
regionalAnalysis.request = templateTask;
69-
broker.enqueueTasksForRegionalJob(regionalAnalysis);
71+
var job = new Job(templateTask, WorkerTags.fromRegionalAnalysis(regionalAnalysis));
72+
var assembler = new MultiOriginAssembler(job, new ArrayList<>());
73+
broker.enqueueTasksForRegionalJob(job, assembler);
7074
}
7175

7276
public static String compactUUID() {

src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java

+60-3
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,20 @@
44
import com.conveyal.analysis.SelectingGridReducer;
55
import com.conveyal.analysis.UserPermissions;
66
import com.conveyal.analysis.components.broker.Broker;
7+
import com.conveyal.analysis.components.broker.Job;
78
import com.conveyal.analysis.components.broker.JobStatus;
9+
import com.conveyal.analysis.components.broker.WorkerTags;
810
import com.conveyal.analysis.models.AnalysisRequest;
911
import com.conveyal.analysis.models.OpportunityDataset;
1012
import com.conveyal.analysis.models.RegionalAnalysis;
1113
import com.conveyal.analysis.persistence.Persistence;
14+
import com.conveyal.analysis.results.AccessCsvResultWriter;
1215
import com.conveyal.analysis.results.CsvResultType;
16+
import com.conveyal.analysis.results.GridResultWriter;
17+
import com.conveyal.analysis.results.MultiOriginAssembler;
18+
import com.conveyal.analysis.results.PathCsvResultWriter;
19+
import com.conveyal.analysis.results.RegionalResultWriter;
20+
import com.conveyal.analysis.results.TimeCsvResultWriter;
1321
import com.conveyal.analysis.util.JsonUtil;
1422
import com.conveyal.file.FileStorage;
1523
import com.conveyal.file.FileStorageFormat;
@@ -20,6 +28,7 @@
2028
import com.conveyal.r5.analyst.PointSet;
2129
import com.conveyal.r5.analyst.PointSetCache;
2230
import com.conveyal.r5.analyst.cluster.RegionalTask;
31+
import com.conveyal.r5.analyst.scenario.Scenario;
2332
import com.fasterxml.jackson.databind.JsonNode;
2433
import com.google.common.primitives.Ints;
2534
import com.mongodb.QueryBuilder;
@@ -44,6 +53,7 @@
4453
import static com.conveyal.analysis.util.JsonUtil.toJson;
4554
import static com.conveyal.file.FileCategory.BUNDLES;
4655
import static com.conveyal.file.FileCategory.RESULTS;
56+
import static com.conveyal.r5.common.Util.notNullOrEmpty;
4757
import static com.conveyal.r5.transit.TransportNetworkCache.getScenarioFilename;
4858
import static com.google.common.base.Preconditions.checkArgument;
4959
import static com.google.common.base.Preconditions.checkNotNull;
@@ -506,17 +516,64 @@ private RegionalAnalysis createRegionalAnalysis (Request req, Response res) thro
506516
// This assigns it creation/update time stamps and an ID, which is needed to name any output CSV files.
507517
regionalAnalysis = Persistence.regionalAnalyses.create(regionalAnalysis);
508518

519+
// Create the regional job
520+
var regionalJob = new Job(task, WorkerTags.fromRegionalAnalysis(regionalAnalysis));
521+
522+
// Create the result writers. Store their result file paths in the database.
523+
var resultWriters = new ArrayList<RegionalResultWriter>();
524+
if (!task.makeTauiSite) {
525+
if (task.recordAccessibility) {
526+
if (task.originPointSet != null) {
527+
var accessWriter = new AccessCsvResultWriter(task, fileStorage);
528+
resultWriters.add(accessWriter);
529+
regionalAnalysis.resultStorage.put(accessWriter.resultType(), accessWriter.getFileName());
530+
} else {
531+
resultWriters.addAll(GridResultWriter.createWritersFromTask(regionalAnalysis, task, fileStorage));
532+
}
533+
}
534+
535+
if (task.recordTimes) {
536+
var timesWriter = new TimeCsvResultWriter(task, fileStorage);
537+
resultWriters.add(timesWriter);
538+
regionalAnalysis.resultStorage.put(timesWriter.resultType(), timesWriter.getFileName());
539+
}
540+
541+
if (task.includePathResults) {
542+
var pathsWriter = new PathCsvResultWriter(task, fileStorage);
543+
resultWriters.add(pathsWriter);
544+
regionalAnalysis.resultStorage.put(pathsWriter.resultType(), pathsWriter.getFileName());
545+
}
546+
checkArgument(notNullOrEmpty(resultWriters), "A regional analysis should always create at least one grid or CSV file.");
547+
}
548+
549+
// Create the multi-origin assembler with the writers.
550+
var assembler = new MultiOriginAssembler(regionalJob, resultWriters);
551+
552+
// Stored scenario is needed by workers. Must be done ahead of enqueueing the job.
553+
storeScenarioJson(task.graphId, task.scenario);
554+
509555
// Register the regional job with the broker, which will distribute individual tasks to workers and track progress.
510-
broker.enqueueTasksForRegionalJob(regionalAnalysis);
556+
broker.enqueueTasksForRegionalJob(regionalJob, assembler);
511557

512558
// Flush to the database any information added to the RegionalAnalysis object when it was enqueued.
513-
// This includes the paths of any CSV files that will be produced by this analysis.
514-
// TODO verify whether there is a reason to use regionalAnalyses.modifyWithoutUpdatingLock() or put().
559+
// This includes the paths of any CSV files that will be produced by this analysis. The regional analysis was
560+
// created in this method and therefore we can bypass the nonce / permission checking.
515561
Persistence.regionalAnalyses.modifiyWithoutUpdatingLock(regionalAnalysis);
516562

517563
return regionalAnalysis;
518564
}
519565

566+
/**
567+
* Store the regional analysis scenario as JSON for retrieval by the workers.
568+
*/
569+
private void storeScenarioJson(String graphId, Scenario scenario) throws IOException {
570+
String fileName = getScenarioFilename(graphId, scenario.id);
571+
FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName);
572+
File localScenario = FileUtils.createScratchFile("json");
573+
JsonUtil.objectMapper.writeValue(localScenario, scenario);
574+
fileStorage.moveIntoStorage(fileStorageKey, localScenario);
575+
}
576+
520577
private RegionalAnalysis updateRegionalAnalysis (Request request, Response response) throws IOException {
521578
RegionalAnalysis regionalAnalysis = JsonUtil.objectMapper.readValue(request.body(), RegionalAnalysis.class);
522579
return Persistence.regionalAnalyses.updateByUserIfPermitted(regionalAnalysis, UserPermissions.from(request));

0 commit comments

Comments
 (0)