Skip to content

Commit e939941

Browse files
committed
Streaming task progress.
Signed-off-by: Govind Kamat <govkamat@amazon.com>
1 parent 317cdc3 commit e939941

File tree

1 file changed

+15
-17
lines changed

1 file changed

+15
-17
lines changed

osbenchmark/worker_coordinator/worker_coordinator.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1297,27 +1297,25 @@ def update_profile_samples(self, profile_samples):
12971297

12981298
def update_progress_message(self, task_finished=False):
12991299
if not self.quiet and self.current_step >= 0:
1300-
is_bulk = False
13011300
tasks = ",".join([t.name for t in self.tasks_per_join_point[self.current_step]])
1302-
if len(self.tasks_per_join_point[self.current_step]) == 1:
1303-
task = set(self.tasks_per_join_point[self.current_step]).pop()
1304-
is_bulk = task.operation.type == 'bulk'
13051301

1306-
if task_finished and not is_bulk:
1307-
total_progress = 1.0
1308-
else:
1309-
# we only count clients which actually contribute to progress. If clients are executing tasks eternally in a parallel
1310-
# structure, we should not count them. The reason is that progress depends entirely on the client(s) that execute the
1311-
# task that is completing the parallel structure.
1312-
progress_per_client = [s.task_progress
1313-
for s in self.most_recent_sample_per_client.values() if s.task_progress is not None]
1314-
1315-
num_clients = max(len(progress_per_client), 1)
1316-
progress_per_client = [p[0] for p in progress_per_client]
1317-
total_progress = sum(progress_per_client) / num_clients
1318-
if is_bulk:
1302+
# we only count clients which actually contribute to progress. If clients are executing tasks eternally in a parallel
1303+
# structure, we should not count them. The reason is that progress depends entirely on the client(s) that execute the
1304+
# task that is completing the parallel structure.
1305+
progress_per_client = [s.task_progress
1306+
for s in self.most_recent_sample_per_client.values() if s.task_progress is not None]
1307+
1308+
num_clients = len(progress_per_client)
1309+
assert num_clients > 0, "Number of clients is 0"
1310+
total_progress = sum([p[0] for p in progress_per_client]) / num_clients
1311+
units = set(progress_per_client)
1312+
assert len(units) == 1, "Encountered mix of disparate units while tracking task progress"
1313+
unit = units.pop()
1314+
if unit != '%':
13191315
self.progress_publisher.print("Running %s" % tasks, "[%4.1f GB]" % total_progress)
13201316
else:
1317+
if task_finished:
1318+
total_progress = 1.0
13211319
self.progress_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100)))
13221320
if task_finished:
13231321
self.progress_publisher.finish()

0 commit comments

Comments
 (0)