Skip to content

Commit 9125347

Browse files
committed
more realtime log fetching
1 parent dd65882 commit 9125347

File tree

3 files changed

+9
-4
lines changed

3 files changed

+9
-4
lines changed

metaflow/metaflow_environment.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from . import metaflow_version
77
from metaflow.exception import MetaflowException
88
from metaflow.extension_support import dump_module_info
9-
from metaflow.mflog import BASH_MFLOG
9+
from metaflow.mflog import BASH_MFLOG, BASH_FLUSH_LOGS
1010
from . import R
1111

1212

@@ -159,6 +159,7 @@ def _get_install_dependencies_cmd(self, datastore_type):
159159
def get_package_commands(self, code_package_url, datastore_type):
160160
cmds = [
161161
BASH_MFLOG,
162+
BASH_FLUSH_LOGS,
162163
"mflog 'Setting up task environment.'",
163164
self._get_install_dependencies_cmd(datastore_type),
164165
"mkdir metaflow",
@@ -176,6 +177,7 @@ def get_package_commands(self, code_package_url, datastore_type):
176177
"fi" % code_package_url,
177178
"TAR_OPTIONS='--warning=no-timestamp' tar xf job.tar",
178179
"mflog 'Task is starting.'",
180+
"flush_mflogs",
179181
]
180182
return cmds
181183

metaflow/mflog/__init__.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
BASH_SAVE_LOGS_ARGS = ["python", "-m", "metaflow.mflog.save_logs"]
4545
BASH_SAVE_LOGS = " ".join(BASH_SAVE_LOGS_ARGS)
4646

47+
BASH_FLUSH_LOGS = "flush_mflogs(){ " f"{BASH_SAVE_LOGS}; " "}"
48+
4749

4850
# this function returns a bash expression that redirects stdout
4951
# and stderr of the given bash expression to mflog.tee
@@ -63,7 +65,7 @@ def bash_capture_logs(bash_expr, var_transform=None):
6365
# update_delay determines how often logs should be uploaded to S3
6466
# as a function of the task execution time
6567

66-
MIN_UPDATE_DELAY = 1.0 # the most frequent update interval
68+
MIN_UPDATE_DELAY = 0.25 # the most frequent update interval
6769
MAX_UPDATE_DELAY = 30.0 # the least frequent update interval
6870

6971

@@ -110,7 +112,6 @@ def export_mflog_env_vars(
110112

111113
def tail_logs(prefix, stdout_tail, stderr_tail, echo, has_log_updates):
112114
def _available_logs(tail, stream, echo, should_persist=False):
113-
# print the latest batch of lines
114115
try:
115116
for line in tail:
116117
if should_persist:
@@ -128,7 +129,7 @@ def _available_logs(tail, stream, echo, should_persist=False):
128129

129130
start_time = time.time()
130131
next_log_update = start_time
131-
log_update_delay = 1
132+
log_update_delay = update_delay(0)
132133
while has_log_updates():
133134
if time.time() > next_log_update:
134135
_available_logs(stdout_tail, "stdout", echo)

metaflow/plugins/pypi/conda_environment.py

+2
Original file line numberDiff line numberDiff line change
@@ -421,13 +421,15 @@ def bootstrap_commands(self, step_name, datastore_type):
421421
if id_:
422422
return [
423423
"echo 'Bootstrapping virtual environment...'",
424+
"flush_mflogs",
424425
# We have to prevent the tracing module from loading,
425426
# as the bootstrapping process uses the internal S3 client which would fail to import tracing
426427
# due to the required dependencies being bundled into the conda environment,
427428
# which is yet to be initialized at this point.
428429
'DISABLE_TRACING=True python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s"'
429430
% (self.flow.name, id_, self.datastore_type),
430431
"echo 'Environment bootstrapped.'",
432+
"flush_mflogs",
431433
# To avoid having to install micromamba in the PATH in micromamba.py, we add it to the PATH here.
432434
"export PATH=$PATH:$(pwd)/micromamba/bin",
433435
"export MF_ARCH=$(case $(uname)/$(uname -m) in Darwin/arm64)echo osx-arm64;;Darwin/*)echo osx-64;;Linux/aarch64)echo linux-aarch64;;*)echo linux-64;;esac)",

0 commit comments

Comments
 (0)