-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathCUDAEnsemble.cu
More file actions
697 lines (670 loc) · 33.3 KB
/
CUDAEnsemble.cu
File metadata and controls
697 lines (670 loc) · 33.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
#include "flamegpu/simulation/CUDAEnsemble.h"
#include <algorithm>
#include <cstdlib>
#include <memory>
#include <thread>
#include <set>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <filesystem>
#include <map>
#include <cstdio>
#include <vector>
#include <string>
#ifdef FLAMEGPU_ENABLE_MPI
#include "flamegpu/simulation/detail/MPIEnsemble.h"
#include "flamegpu/simulation/detail/MPISimRunner.h"
#endif
#include "flamegpu/version.h"
#include "flamegpu/model/ModelDescription.h"
#include "flamegpu/simulation/RunPlanVector.h"
#include "flamegpu/detail/compute_capability.cuh"
#include "flamegpu/detail/SteadyClockTimer.h"
#include "flamegpu/simulation/CUDASimulation.h"
#include "flamegpu/io/StateWriterFactory.h"
#include "flamegpu/simulation/LoggingConfig.h"
#include "flamegpu/simulation/detail/SimRunner.h"
#include "flamegpu/simulation/LogFrame.h"
#include "flamegpu/simulation/detail/SimLogger.h"
#include "flamegpu/detail/cuda.cuh"
#include "flamegpu/io/Telemetry.h"
namespace flamegpu {
CUDAEnsemble::EnsembleConfig::EnsembleConfig()
: telemetry(flamegpu::io::Telemetry::isEnabled()) {}
CUDAEnsemble::CUDAEnsemble(const ModelDescription& _model, int argc, const char** argv, bool _isSWIG)
: model(_model.model->clone())
, isSWIG(_isSWIG) {
initialise(argc, argv);
}
CUDAEnsemble::~CUDAEnsemble() {
// Call this here incase simulate() exited with an exception
#ifdef _MSC_VER
if (config.block_standby) {
// Disable prevention of standby
SetThreadExecutionState(ES_CONTINUOUS);
}
#endif
}
unsigned int CUDAEnsemble::simulate(const RunPlanVector& plans) {
#ifdef _MSC_VER
if (config.block_standby) {
// This thread requires the system continuously until it exits
SetThreadExecutionState(ES_CONTINUOUS | ES_SYSTEM_REQUIRED);
}
#endif
// Validate that RunPlan model matches CUDAEnsemble model
if (*plans.environment != this->model->environment->properties) {
THROW exception::InvalidArgument("RunPlan is for a different ModelDescription, in CUDAEnsemble::simulate()");
}
#ifdef FLAMEGPU_ENABLE_MPI
std::unique_ptr<detail::MPIEnsemble> mpi = std::make_unique<detail::MPIEnsemble>(config, static_cast<unsigned int>(plans.size()));
#endif
// Validate/init output directories
if (!config.out_directory.empty()
#ifdef FLAMEGPU_ENABLE_MPI
&& (!config.mpi || mpi->world_rank == 0)
#endif
) {
// Validate out format is right
config.out_format = io::StateWriterFactory::detectSupportedFileExt(config.out_format);
if (config.out_format.empty()) {
THROW exception::InvalidArgument("The out_directory config option also requires the out_format options to be set to a suitable type (e.g. 'json', 'xml'), in CUDAEnsemble::simulate()");
}
// Check that output files don't already exist
if (std::filesystem::exists(config.out_directory)) {
std::set<std::filesystem::path> exit_files;
for (unsigned int p = 0; p < plans.size(); ++p) {
std::filesystem::path exit_path = config.out_directory;
if (!plans[p].getOutputSubdirectory().empty())
exit_path /= std::filesystem::path(plans[p].getOutputSubdirectory());
exit_path /= std::filesystem::path("exit." + config.out_format);
exit_files.insert(exit_path);
}
if (!config.truncate_log_files) {
// Step
for (unsigned int p = 0; p < plans.size(); ++p) {
std::filesystem::path step_path = config.out_directory;
if (!plans[p].getOutputSubdirectory().empty())
step_path /= std::filesystem::path(plans[p].getOutputSubdirectory());
step_path /= std::filesystem::path(std::to_string(p) + "." + config.out_format);
if (std::filesystem::exists(step_path)) {
THROW exception::FileAlreadyExists("Step log file '%s' already exists, in CUDAEnsemble::simulate()", step_path.generic_string().c_str());
}
}
// Exit
for (const auto &exit_path : exit_files) {
if (std::filesystem::exists(exit_path)) {
THROW exception::FileAlreadyExists("Exit log file '%s' already exists, in CUDAEnsemble::simulate()", exit_path.generic_string().c_str());
}
}
} else {
// Delete pre-existing exit log files
for (const auto& exit_path : exit_files) {
std::filesystem::remove(exit_path); // Returns false if the file didn't exist
}
}
}
// Create any missing directories
try {
std::filesystem::create_directories(config.out_directory);
} catch (const std::exception &e) {
THROW exception::InvalidArgument("Unable to use output directory '%s', in CUDAEnsemble::simulate(): %s", config.out_directory.c_str(), e.what());
}
for (const auto &p : plans) {
const auto subdir = p.getOutputSubdirectory();
if (!subdir.empty()) {
std::filesystem::path sub_path = config.out_directory;
try {
sub_path.append(subdir);
std::filesystem::create_directories(sub_path);
} catch (const std::exception &e) {
THROW exception::InvalidArgument("Unable to use output subdirectory '%s', in CUDAEnsemble::simulate(): %s", sub_path.generic_string().c_str(), e.what());
}
}
}
}
// Purge run logs, and resize ready for new runs
// Resize means we can setup logs during execution out of order, without risk of list being reallocated
run_logs.clear();
// Workout how many devices and runner we will be executing
// if MPI is enabled, This will throw exceptions if any rank has 0 GPUs visible, prior to device allocation preventing issues where rank 0 would not be participating.
int device_count = -1;
cudaError_t cudaStatus = cudaGetDeviceCount(&device_count);
if (cudaStatus != cudaSuccess) {
THROW exception::InvalidCUDAdevice("Error finding CUDA devices! Do you have a CUDA-capable GPU installed?, in CUDAEnsemble::simulate()");
}
if (device_count == 0) {
THROW exception::InvalidCUDAdevice("Error no CUDA devices found!, in CUDAEnsemble::simulate()");
}
for (const int id : config.devices) {
if (id >= device_count) {
THROW exception::InvalidCUDAdevice("Requested CUDA device %d is not valid, only %d CUDA devices available!, in CUDAEnsemble::simulate()", id, device_count);
}
}
// Select the actual devices to be used, based on user provided gpus, architecture compatibility, and optionally mpi ranks per node.
// For non-mpi builds / configurations, just use all the devices provided by the user / all visible devices (then check they are valid later)
// For MPI builds with mpi enabled, load balance the gpus across mpi ranks within the shared memory system. If there are more ranks than gpus, latter ranks will not participate.
std::set<int> devices;
// initialise the local devices set to be the non-mpi behaviour, using config.devices or all visible cuda devices
if (config.devices.size()) {
devices = config.devices;
} else {
// If no devices were specified by the user, use all visible devices but load balance if MPI is in use.
for (int i = 0; i < device_count; ++i) {
devices.emplace(i);
}
}
#ifdef FLAMEGPU_ENABLE_MPI
// if MPI is enabled at compile time, use the MPIEnsemble method to assign devices balanced across ranks
devices = mpi->devicesForThisRank(devices);
#endif // ifdef FLAMEGPU_ENABLE_MPI
// Check that each device is capable, and init cuda context
for (auto d = devices.begin(); d != devices.end(); ++d) {
if (!detail::compute_capability::checkComputeCapability(*d)) {
// Emit a warning unless quiet verbosity was specified.
if (config.verbosity >= Verbosity::Default) {
fprintf(stderr, "FLAMEGPU2 has not been built with an appropriate compute capability for device %d, this device will not be used.\n", *d);
}
d = devices.erase(d);
--d;
} else {
gpuErrchk(cudaSetDevice(*d));
gpuErrchk(flamegpu::detail::cuda::cudaFree(nullptr));
}
}
// Return to device 0 (or check original device first?)
gpuErrchk(cudaSetDevice(0));
// If there are no devices left (and mpi is not being used), we need to error as the work cannot be executed.
#ifndef FLAMEGPU_ENABLE_MPI
if (devices.size() == 0) {
THROW exception::InvalidCUDAdevice("FLAMEGPU2 has not been built with an appropriate compute capability for any devices, unable to continue\n");
}
#endif // ifndef FLAMEGPU_ENABLE_MPI
#ifdef FLAMEGPU_ENABLE_MPI
// Once the number of devices per rank is known, we can create the actual communicator to be used during MPI, so we can warn/error as needed.
// This rank is participating if it has atleast one device assigned to it.
// Rank 0 will be participating at this point, otherwise InvalidCUDAdevice would have been thrown
// This also implies the participating communicator cannot have a size of 0, as atleast one thread must be participating at this point, but throw in that case just in case.
bool communicatorCreated = mpi->createParticipatingCommunicator(devices.size() > 0);
// If the communicator failed to be created or is empty for any participating threads, throw. This should never occur.
if (!communicatorCreated || mpi->getParticipatingCommSize() == 0) {
THROW exception::EnsembleError("Unable to create MPI communicator. Ensure atleast one GPU is visible.\n");
}
// If the world size is not the participating size, issue a warning.that too many threads have been used.
if (mpi->world_rank == 0 && mpi->world_size != mpi->getParticipatingCommSize() && config.verbosity >= Verbosity::Default) {
fprintf(stderr, "Warning: MPI Ensemble launched with %d MPI ranks, but only %d ranks have GPUs assigned. %d ranks are unneccesary.\n", mpi->world_size, mpi->getParticipatingCommSize(), mpi->world_size - mpi->getParticipatingCommSize());
fflush(stderr);
}
#endif
const unsigned int TOTAL_RUNNERS = static_cast<unsigned int>(devices.size()) * config.concurrent_runs;
// Log Time (We can't use CUDA events here, due to device resets)
auto ensemble_timer = detail::SteadyClockTimer();
ensemble_timer.start();
// Reset the elapsed time.
ensemble_elapsed_time = 0.;
// Logging thread-safety items
std::queue<unsigned int> log_export_queue;
std::mutex log_export_queue_mutex;
std::condition_variable log_export_queue_cdn;
#ifdef FLAMEGPU_ENABLE_MPI
// In MPI mode, Rank 0 will collect errors from all ranks
std::multimap<int, detail::AbstractSimRunner::ErrorDetail> err_detail = {};
#endif
std::vector<detail::AbstractSimRunner::ErrorDetail> err_detail_local = {};
// Init log worker
detail::SimLogger *log_worker = nullptr;
if (!config.out_directory.empty()) {
log_worker = new detail::SimLogger(run_logs, plans, config.out_directory, config.out_format, log_export_queue, log_export_queue_mutex, log_export_queue_cdn,
step_log_config.get(), exit_log_config.get(), step_log_config && step_log_config->log_timing, exit_log_config && exit_log_config->log_timing);
}
// In MPI mode, only Rank 0 increments the error counter
unsigned int err_count = 0;
if (config.mpi) {
#ifdef FLAMEGPU_ENABLE_MPI
// Setup MPISimRunners
detail::MPISimRunner** runners = static_cast<detail::MPISimRunner**>(malloc(sizeof(detail::MPISimRunner*) * TOTAL_RUNNERS));
std::vector<std::atomic<unsigned int>> err_cts(TOTAL_RUNNERS);
std::vector<std::atomic<unsigned int>> next_runs(TOTAL_RUNNERS);
for (unsigned int i = 0; i < TOTAL_RUNNERS; ++i) {
err_cts[i] = UINT_MAX;
next_runs[i] = detail::MPISimRunner::Signal::RequestJob;
}
{
unsigned int i = 0;
for (auto& d : devices) {
for (unsigned int j = 0; j < config.concurrent_runs; ++j) {
runners[i] = new detail::MPISimRunner(model, err_cts[i], next_runs[i], plans,
step_log_config, exit_log_config,
d, j,
config.verbosity,
run_logs, log_export_queue, log_export_queue_mutex, log_export_queue_cdn, err_detail_local, TOTAL_RUNNERS, isSWIG);
runners[i]->start();
++i;
}
}
}
// Wait for runners to request work, then communicate via MPI to get assignments
// If work_rank == 0, also perform the assignments
if (mpi->world_rank == 0) {
unsigned int next_run = 0;
MPI_Status status;
int flag;
int mpi_runners_fin = 1; // Start at 1 because we have always already finished
// Wait for all runs to have been assigned, and all MPI runners to have been notified of fin
while (next_run < plans.size() || mpi_runners_fin < mpi->getParticipatingCommSize()) {
// Check for errors
const int t_err_count = mpi->receiveErrors(err_detail);
err_count += t_err_count;
if (t_err_count && config.error_level == EnsembleConfig::Fast) {
// Skip to end to kill workers
next_run = plans.size();
}
// Check whether local runners require a job assignment
for (unsigned int i = 0; i < next_runs.size(); ++i) {
auto &r = next_runs[i];
unsigned int run_id = r.load();
if (run_id == detail::MPISimRunner::Signal::RunFailed) {
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i, devices);
++err_count;
if (config.error_level == EnsembleConfig::Fast) {
// Skip to end to kill workers
next_run = plans.size();
}
run_id = detail::MPISimRunner::Signal::RequestJob;
}
if (run_id == detail::MPISimRunner::Signal::RequestJob) {
r.store(next_run++);
// Print progress to console
if (config.verbosity >= Verbosity::Default && next_run <= plans.size()) {
fprintf(stdout, "MPI ensemble assigned run %d/%u to rank 0\n", next_run, static_cast<unsigned int>(plans.size()));
fflush(stdout);
}
}
}
// Check whether MPI runners require a job assignment
mpi_runners_fin += mpi->receiveJobRequests(next_run);
// Yield, rather than hammering the processor
std::this_thread::yield();
}
} else if (mpi->getRankIsParticipating()) {
// Wait for all runs to have been assigned, and all MPI runners to have been notified of fin. ranks without GPU(s) do not request jobs.
unsigned int next_run = 0;
MPI_Status status;
while (next_run < plans.size()) {
// Check whether local runners require a job assignment
for (unsigned int i = 0; i < TOTAL_RUNNERS; ++i) {
unsigned int runner_status = next_runs[i].load();
if (runner_status == detail::MPISimRunner::Signal::RunFailed) {
// Fetch the job id, increment local error counter
const unsigned int failed_run_id = err_cts[i].exchange(UINT_MAX);
++err_count;
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i, devices);
runner_status = detail::MPISimRunner::Signal::RequestJob;
}
if (runner_status == detail::MPISimRunner::Signal::RequestJob) {
next_run = mpi->requestJob();
// Pass the job to runner that requested it
next_runs[i].store(next_run);
// Break if assigned job is out of range, work is finished
if (next_run >= plans.size()) {
break;
}
}
}
std::this_thread::yield();
}
}
// Notify all local runners to exit
for (unsigned int i = 0; i < TOTAL_RUNNERS; ++i) {
auto &r = next_runs[i];
if (r.exchange(plans.size()) == detail::MPISimRunner::Signal::RunFailed) {
++err_count;
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i, devices);
}
}
// Wait for all runners to exit
for (unsigned int i = 0; i < TOTAL_RUNNERS; ++i) {
runners[i]->join();
delete runners[i];
if (next_runs[i].load() == detail::MPISimRunner::Signal::RunFailed) {
++err_count;
// Retrieve and handle local error detail
mpi->retrieveLocalErrorDetail(log_export_queue_mutex, err_detail, err_detail_local, i, devices);
}
}
#endif
} else {
detail::SimRunner** runners = static_cast<detail::SimRunner**>(malloc(sizeof(detail::SimRunner*) * TOTAL_RUNNERS));
std::atomic<unsigned int> err_ct = { 0u };
std::atomic<unsigned int> next_runs = { 0u };
// Setup SimRunners
{
unsigned int i = 0;
for (auto& d : devices) {
for (unsigned int j = 0; j < config.concurrent_runs; ++j) {
runners[i] = new detail::SimRunner(model, err_ct, next_runs, plans,
step_log_config, exit_log_config,
d, j,
config.verbosity, config.error_level == EnsembleConfig::Fast,
run_logs, log_export_queue, log_export_queue_mutex, log_export_queue_cdn, err_detail_local, TOTAL_RUNNERS, isSWIG);
runners[i++]->start();
}
}
}
// Wait for all runners to exit
for (unsigned int i = 0; i < TOTAL_RUNNERS; ++i) {
runners[i]->join();
delete runners[i];
}
err_count = err_ct;
}
// Notify logger to exit
if (log_worker) {
{
std::lock_guard<std::mutex> lck(log_export_queue_mutex);
log_export_queue.push(UINT_MAX);
}
log_export_queue_cdn.notify_one();
log_worker->thread.join();
delete log_worker;
log_worker = nullptr;
}
#ifdef FLAMEGPU_ENABLE_MPI
std::string remote_device_names;
if (config.mpi) {
// Ensure all workers have finished before exit
mpi->worldBarrier();
// Check whether MPI runners have reported any final errors
err_count += mpi->receiveErrors(err_detail);
if (config.telemetry) {
// All ranks should notify rank 0 of their GPU devices
remote_device_names = mpi->assembleGPUsString();
}
}
#endif
// Record and store the elapsed time
ensemble_timer.stop();
ensemble_elapsed_time = ensemble_timer.getElapsedSeconds();
// Ensemble has finished, print summary
if (config.verbosity > Verbosity::Quiet &&
#ifdef FLAMEGPU_ENABLE_MPI
(!config.mpi || mpi->world_rank == 0) &&
#endif
(config.error_level != EnsembleConfig::Fast || err_count == 0)) {
printf("\rCUDAEnsemble completed %u runs successfully!\n", static_cast<unsigned int>(plans.size() - err_count));
if (err_count)
printf("There were a total of %u errors.\n", err_count);
}
if ((config.timing || config.verbosity >= Verbosity::Verbose) &&
#ifdef FLAMEGPU_ENABLE_MPI
(!config.mpi || mpi->world_rank == 0) &&
#endif
(config.error_level != EnsembleConfig::Fast || err_count == 0)) {
printf("Ensemble time elapsed: %fs\n", ensemble_elapsed_time);
}
// Send Telemetry
if (config.telemetry
#ifdef FLAMEGPU_ENABLE_MPI
&& (!config.mpi || mpi->world_rank == 0)
#endif
) {
// Generate some payload items
std::map<std::string, std::string> payload_items;
#ifndef FLAMEGPU_ENABLE_MPI
payload_items["GPUDevices"] = flamegpu::detail::compute_capability::getDeviceNames(config.devices);
#else
payload_items["GPUDevices"] = flamegpu::detail::compute_capability::getDeviceNames(config.devices) + remote_device_names;
#endif
payload_items["SimTime(s)"] = std::to_string(ensemble_elapsed_time);
#if defined(__CUDACC_VER_MAJOR__) && defined(__CUDACC_VER_MINOR__) && defined(__CUDACC_VER_BUILD__)
payload_items["NVCCVersion"] = std::to_string(__CUDACC_VER_MAJOR__) + "." + std::to_string(__CUDACC_VER_MINOR__) + "." + std::to_string(__CUDACC_VER_BUILD__);
#endif
// Add the ensemble size to the ensemble telemetry payload
payload_items["PlansSize"] = std::to_string(plans.size());
payload_items["ConcurrentRuns"] = std::to_string(config.concurrent_runs);
// Add MPI details to the ensemble telemetry payload
payload_items["mpi"] = config.mpi ? "true" : "false";
#ifdef FLAMEGPU_ENABLE_MPI
payload_items["mpi_world_size"] = std::to_string(mpi->world_size);
#endif
// generate telemetry data
std::string telemetry_data = flamegpu::io::Telemetry::generateData("ensemble-run", payload_items, isSWIG);
// send the telemetry packet
bool telemetrySuccess = flamegpu::io::Telemetry::sendData(telemetry_data);
// If verbose, print either a successful send, or a misc warning.
if (config.verbosity >= Verbosity::Verbose) {
if (telemetrySuccess) {
fprintf(stdout, "Telemetry packet sent to '%s' json was: %s\n", flamegpu::io::Telemetry::TELEMETRY_ENDPOINT, telemetry_data.c_str());
} else {
fprintf(stderr, "Warning: Usage statistics for CUDAEnsemble failed to send with json: %s\n", telemetry_data.c_str());
}
}
} else {
// Encourage users who have opted out to opt back in, unless suppressed.
if ((config.verbosity > Verbosity::Quiet)
#ifdef FLAMEGPU_ENABLE_MPI
&& (!config.mpi || mpi->world_rank == 0)
#endif
) {
flamegpu::io::Telemetry::encourageUsage();
}
}
#ifdef FLAMEGPU_ENABLE_MPI
if (config.mpi && mpi->world_rank != 0) {
// All errors are reported via rank 0
err_count = 0;
}
#endif
if (config.error_level == EnsembleConfig::Fast && err_count) {
if (config.mpi) {
#ifdef FLAMEGPU_ENABLE_MPI
for (const auto &e : err_detail) {
THROW exception::EnsembleError("Run %u failed on rank %d, device %d, thread %u with exception: \n%s\n",
e.second.run_id, e.first, e.second.device_id, e.second.runner_id, e.second.exception_string);
}
#endif
}
THROW exception::EnsembleError("Run %u failed on device %d, thread %u with exception: \n%s\n",
err_detail_local[0].run_id, err_detail_local[0].device_id, err_detail_local[0].runner_id, err_detail_local[0].exception_string);
} else if (config.error_level == EnsembleConfig::Slow && err_count) {
THROW exception::EnsembleError("%u/%u runs failed!\n.", err_count, static_cast<unsigned int>(plans.size()));
}
#ifdef _MSC_VER
if (config.block_standby) {
// Disable prevention of standby
SetThreadExecutionState(ES_CONTINUOUS);
}
#endif
return err_count;
}
void CUDAEnsemble::initialise(int argc, const char** argv) {
if (!checkArgs(argc, argv)) {
exit(EXIT_FAILURE);
}
// If verbose, output the flamegpu version and seed.
if (config.verbosity == Verbosity::Verbose) {
fprintf(stdout, "FLAME GPU %s\n", flamegpu::VERSION_FULL);
fprintf(stdout, "Ensemble configuration:\n");
fprintf(stdout, "\tConcurrent runs: %u\n", config.concurrent_runs);
}
}
int CUDAEnsemble::checkArgs(int argc, const char** argv) {
// Parse optional args
int i = 1;
for (; i < argc; i++) {
// Get arg as lowercase
std::string arg(argv[i]);
std::transform(arg.begin(), arg.end(), arg.begin(), [](unsigned char c) { return std::use_facet< std::ctype<char>>(std::locale()).tolower(c); });
// -h/--help. Print the help output and exit.
if (arg.compare("--help") == 0 || arg.compare("-h") == 0) {
printHelp(argv[0]);
return false;
}
// --concurrent <runs>, Number of concurrent simulations to run per device
if (arg.compare("--concurrent") == 0 || arg.compare("-c") == 0) {
if (i + 1 >= argc) {
fprintf(stderr, "%s requires a trailing argument\n", arg.c_str());
return false;
}
config.concurrent_runs = static_cast<unsigned int>(strtoul(argv[++i], nullptr, 0));
continue;
}
// --devices <string>, comma separated list of uints
if (arg.compare("--devices") == 0 || arg.compare("-d") == 0) {
if (i + 1 >= argc) {
fprintf(stderr, "%s requires a trailing argument\n", arg.c_str());
return false;
}
// Split and parse string
std::string device_string = argv[++i];
device_string += ","; // Append comma, to catch final item
int max_id = 0; // Catch max device so we can validate it exists
size_t pos;
while ((pos = device_string.find(",")) != std::string::npos) {
const unsigned int id = static_cast<unsigned int>(strtoul(device_string.substr(0, pos).c_str(), nullptr, 0));
if (id == 0 && (device_string.length() < 2 || (device_string[0] != '0' || device_string[1] != ','))) {
fprintf(stderr, "'%s' is not a valid device index.\n", device_string.substr(0, pos).c_str());
printHelp(argv[0]);
return false;
}
max_id = static_cast<int>(id) > max_id ? id : max_id;
config.devices.emplace(id);
device_string.erase(0, pos + 1);
}
int ct = -1;
gpuErrchk(cudaGetDeviceCount(&ct));
if (max_id >= ct) {
fprintf(stderr, "Device id %u exceeds available CUDA devices %d\n", max_id, ct);
printHelp(argv[0]);
return false;
}
continue;
}
// -o/--out <directory> <filetype>, Quiet FLAME GPU output.
if (arg.compare("--out") == 0 || arg.compare("-o") == 0) {
if (i + 2 >= argc) {
fprintf(stderr, "%s requires two trailing arguments\n", arg.c_str());
return false;
}
// Validate output directory is valid (and recursively create it if necessary)
try {
std::filesystem::path out_directory = argv[++i];
std::filesystem::create_directories(out_directory);
config.out_directory = out_directory.generic_string();
} catch (const std::exception &e) {
// Catch any exceptions, probably std::filesystem::filesystem_error, but other implementation defined errors also possible
fprintf(stderr, "Unable to use '%s' as output directory:\n%s\n", argv[i], e.what());
return false;
}
// Validate output format is available in io module
config.out_format = io::StateWriterFactory::detectSupportedFileExt(argv[++i]);
if (config.out_format.empty()) {
fprintf(stderr, "'%s' is not a supported output file type.\n", argv[i]);
return false;
}
continue;
}
// -q/--quiet, Don't report progress to console.
if (arg.compare("--quiet") == 0 || arg.compare("-q") == 0) {
config.verbosity = Verbosity::Quiet;
continue;
}
// -v/--verbose, Report all progress to console.
if (arg.compare("--verbose") == 0 || arg.compare("-v") == 0) {
config.verbosity = Verbosity::Verbose;
continue;
}
// -t/--timing, Output timing information to stdout
if (arg.compare("--timing") == 0 || arg.compare("-t") == 0) {
config.timing = true;
continue;
}
// -u/--silence-unknown-args, Silence warning for unknown arguments
if (arg.compare("--silence-unknown-args") == 0 || arg.compare("-u") == 0) {
config.silence_unknown_args = true;
continue;
}
// -e/--error, Specify the error level
if (arg.compare("--error") == 0 || arg.compare("-e") == 0) {
if (i + 1 >= argc) {
fprintf(stderr, "%s requires a trailing argument\n", arg.c_str());
return false;
}
std::string error_level_string = argv[++i];
// Shift the trailing arg to lower
std::transform(error_level_string.begin(), error_level_string.end(), error_level_string.begin(), [](unsigned char c) { return std::use_facet< std::ctype<char>>(std::locale()).tolower(c); });
if (error_level_string.compare("off") == 0 || error_level_string.compare(std::to_string(EnsembleConfig::Off)) == 0) {
config.error_level = EnsembleConfig::Off;
} else if (error_level_string.compare("slow") == 0 || error_level_string.compare(std::to_string(EnsembleConfig::Slow)) == 0) {
config.error_level = EnsembleConfig::Slow;
} else if (error_level_string.compare("fast") == 0 || error_level_string.compare(std::to_string(EnsembleConfig::Fast)) == 0) {
config.error_level = EnsembleConfig::Fast;
} else {
fprintf(stderr, "%s is not an appropriate argument for %s\n", error_level_string.c_str(), arg.c_str());
return false;
}
continue;
}
// --truncate, Truncate output files
if (arg.compare("--truncate") == 0) {
config.truncate_log_files = true;
continue;
}
// --standby Disable the blocking of standby
if (arg.compare("--standby") == 0) {
#ifdef _MSC_VER
config.block_standby = false;
#endif
continue;
}
// Warning if not in QUIET verbosity or if silence-unknown-args is set
if (!(config.verbosity == flamegpu::Verbosity::Quiet || config.silence_unknown_args))
fprintf(stderr, "Warning: Unknown argument '%s' passed to Ensemble will be ignored\n", arg.c_str());
}
return true;
}
void CUDAEnsemble::printHelp(const char *executable) {
printf("FLAME GPU %s\n", flamegpu::VERSION_FULL);
printf("Usage: %s [optional arguments]\n", executable);
printf("Optional Arguments:\n");
const char *line_fmt = "%-18s %s\n";
printf(line_fmt, "-h, --help", "show this help message and exit");
printf(line_fmt, "-d, --devices <device ids>", "Comma separated list of device ids to be used");
printf(line_fmt, "", "By default, all available devices will be used.");
printf(line_fmt, "-c, --concurrent <runs>", "Number of concurrent simulations to run per device");
printf(line_fmt, "", "By default, 4 will be used.");
printf(line_fmt, "-o, --out <directory> <filetype>", "Directory and filetype for ensemble outputs");
printf(line_fmt, "-q, --quiet", "Do not print progress information to console");
printf(line_fmt, "-v, --verbose", "Print config, progress and timing (-t) information to console");
printf(line_fmt, "-t, --timing", "Output timing information to stdout");
printf(line_fmt, "-e, --error <error level>", "The error level 0, 1, 2, off, slow or fast");
printf(line_fmt, "", "By default, \"slow\" will be used.");
printf(line_fmt, "-u, --silence-unknown-args", "Silence warnings for unknown arguments passed after this flag.");
#ifdef _MSC_VER
printf(line_fmt, " --standby", "Allow the machine to enter standby during execution");
#endif
}
void CUDAEnsemble::setStepLog(const StepLoggingConfig &stepConfig) {
// Validate ModelDescription matches
if (*stepConfig.model != *model) {
THROW exception::InvalidArgument("Model descriptions attached to LoggingConfig and CUDAEnsemble do not match, in CUDAEnsemble::setStepLog()\n");
}
// Set internal config
step_log_config = std::make_shared<StepLoggingConfig>(stepConfig);
}
void CUDAEnsemble::setExitLog(const LoggingConfig &exitConfig) {
// Validate ModelDescription matches
if (*exitConfig.model != *model) {
THROW exception::InvalidArgument("Model descriptions attached to LoggingConfig and CUDAEnsemble do not match, in CUDAEnsemble::setExitLog()\n");
}
// Set internal config
exit_log_config = std::make_shared<LoggingConfig>(exitConfig);
}
const std::map<unsigned int, RunLog> &CUDAEnsemble::getLogs() {
return run_logs;
}
} // namespace flamegpu