Skip to content

Commit 2eda03c

Browse files
Write container logs to separate text files as runs complete (#69)
Why these changes are being introduced: * Improve observability of 'run_ab_transforms' while running by creating individual log files per container as a run is completed. How this addresses that need: * Write logs to a persistent folder in the run directory * Add 'write_log_file' sub-function to 'run_ab_transforms' * Deprecate 'aggregate_logs' sub-function * Simplify logs in 'run_ab_transforms' to improve readability * Remove list of containers from return value of 'collect_container_results' * Update unit tests Side effects of this change: * This introduces a new 'logs' folder in the run directory, which is now created as part of the 'init_run' core function. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-382
1 parent d150081 commit 2eda03c

File tree

6 files changed

+195
-158
lines changed

6 files changed

+195
-158
lines changed

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,15 @@ Usage: -c run-diff [OPTIONS]
188188
189189
Options:
190190
-d, --job-directory TEXT Job directory to create. [required]
191-
-i, --input-files TEXT Input files to transform. This may be a comma
191+
-i, --input-files TEXT Input files to transform. This may be a comma
192192
separated list of input files, or a local CSV file
193193
that provides a list of files. [required]
194194
-m, --message TEXT Message to describe Run.
195+
--download-files Pass to download input files (or use previously
196+
downloaded input files) from AWS S3. The
197+
downloaded files are stored in a local MinIO S3
198+
server and made available for Transmogrifier to
199+
use.
195200
-h, --help Show this message and exit.
196201
```
197202

abdiff/cli.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def init_job(
141141
type=str,
142142
required=True,
143143
help=(
144-
"Input files to transform. This may be a comma separated list of input files, "
144+
"Input files to transform. This may be a comma separated list of input files, "
145145
"or a local CSV file that provides a list of files."
146146
),
147147
)
@@ -157,8 +157,9 @@ def init_job(
157157
"--download-files",
158158
is_flag=True,
159159
help=(
160-
"Pass to download input files from AWS S3 to a local Minio S3 server "
161-
"for Transmogrifier to use."
160+
"Pass to download input files (or use previously downloaded input files) "
161+
"from AWS S3. The downloaded files are stored in a local MinIO S3 server "
162+
"and made available for Transmogrifier to use."
162163
),
163164
)
164165
def run_diff(

abdiff/core/init_run.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ def init_run(
2020
"""Function to initialize a new Run as part of a parent Job."""
2121
run_timestamp = datetime.datetime.now(tz=datetime.UTC).strftime("%Y-%m-%d_%H-%M-%S")
2222
run_directory = str(Path(job_directory) / "runs" / run_timestamp)
23+
logs_directory = str(Path(run_directory) / "logs")
2324
os.makedirs(run_directory)
25+
os.makedirs(logs_directory)
2426
logger.info(f"Run directory created: {run_directory}")
27+
logger.info(f"Logs directory created: {logs_directory}")
2528

2629
# clone job data and update with run information
2730
run_data = read_job_json(job_directory)

abdiff/core/run_ab_transforms.py

Lines changed: 67 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,11 @@ def run_ab_transforms(
4040
) -> tuple[list[str], ...]:
4141
"""Run Docker containers with versioned images of Transmogrifier.
4242
43-
The following steps are performed in sequential order:
44-
1. Directories are created to capture the transformed files.
45-
2. For all input files, an A and B version of Transmogrifier is run.
46-
3. Wait for all containers to complete.
47-
4. Aggregate logs from all containers.
48-
5. Validate output.
49-
6. Update run.json with lists describing input and transformed files.
50-
7. Return a tuple containing two lists representing all A and B transformed files.
51-
5243
Parallelization is handled by invoking the Docker containers via threads, limited by
5344
the ThreadPoolExecutor.max_workers argument. Each thread invokes a detached Docker
54-
container and manages its lifecycle until completion.
45+
container and manages its lifecycle until completion. As each container exits, logs
46+
are written to a text file in the run directory; log filenames follow the format:
47+
"{source}-{run-date}-{run-type}-{container-short-id}-logs.txt".
5548
5649
Args:
5750
run_directory (str): Run directory.
@@ -88,8 +81,10 @@ def run_ab_transforms(
8881
# initialize environment
8982
if not docker_client:
9083
docker_client = docker.from_env()
84+
9185
transformed_directory_a, transformed_directory_b = create_subdirectories(
92-
base_directory=run_directory, subdirectories=["transformed/a", "transformed/b"]
86+
base_directory=run_directory,
87+
subdirectories=["transformed/a", "transformed/b"],
9388
)
9489
logger.info(
9590
"Transformed directories created: "
@@ -102,19 +97,17 @@ def run_ab_transforms(
10297

10398
# run containers and collect results
10499
futures = run_all_docker_containers(
105-
docker_client, input_files, run_configs, use_local_s3=use_local_s3
100+
docker_client, input_files, run_configs, run_directory, use_local_s3=use_local_s3
106101
)
107-
containers, exceptions = collect_container_results(futures)
102+
exceptions = collect_container_results(futures)
108103
logger.info(
109-
f"Successful containers: {len(containers)}, failed containers: {len(exceptions)}"
104+
f"Successful containers: {len(futures)}, failed containers: {len(exceptions)}"
110105
)
111106

112107
# process results
113-
log_file = aggregate_logs(run_directory, containers)
114-
logger.info(f"Log file created: {log_file}")
115108
if not CONFIG.allow_failed_transmogrifier_containers and exceptions:
116109
raise RuntimeError( # noqa: TRY003
117-
f"{len(exceptions)} / {len(containers)} containers failed "
110+
f"{len(exceptions)} / {len(futures)} containers failed "
118111
"to complete successfully."
119112
)
120113
ab_transformed_file_lists = get_transformed_files(run_directory)
@@ -137,6 +130,7 @@ def run_all_docker_containers(
137130
docker_client: docker.client.DockerClient,
138131
input_files: list[str],
139132
run_configs: list[tuple],
133+
run_directory: str,
140134
*,
141135
use_local_s3: bool = False,
142136
) -> list[Future]:
@@ -157,6 +151,7 @@ def run_all_docker_containers(
157151
for docker_image, transformed_directory in run_configs:
158152
args = (
159153
docker_image,
154+
run_directory,
160155
transformed_directory,
161156
str(filename_details["source"]),
162157
input_file,
@@ -175,6 +170,7 @@ def run_all_docker_containers(
175170

176171
def run_docker_container(
177172
docker_image: str,
173+
run_directory: str,
178174
transformed_directory: str,
179175
source: str,
180176
input_file: str,
@@ -218,41 +214,52 @@ def run_docker_container(
218214
},
219215
volumes=[f"{os.path.abspath(transformed_directory)}:/tmp"],
220216
)
221-
logger.info(
222-
f"Container '{container.id}' (Docker image: {docker_image}) "
223-
f"RUNNING transform for '{source}' input_file: {input_file}."
224-
)
217+
logger.info(f"Transmogrifier container ({container.short_id}) STARTED: {input_file}")
225218

226219
exception = None
227220
try:
228221
start_time = perf_counter()
229222
while True:
230223
time.sleep(0.5)
231224
container.reload()
225+
elapsed_time = time.perf_counter() - start_time
232226
if container.status == "exited":
233-
logger.info(f"Container {container.id} exited.")
227+
logger.info(
228+
f"Transmogrifier container ({container.short_id}) EXITED, "
229+
f"elapsed {timedelta(seconds=elapsed_time)}: {input_file}"
230+
)
231+
write_log_file(run_directory, input_file, container)
234232
break
235233

236234
if time.perf_counter() - start_time > timeout:
237235
logger.error(
238-
f"Container {container.id} timed out after {timeout} seconds"
236+
f"Transmogrifier container ({container.short_id}) TIMED OUT, "
237+
f"elapsed {timedelta(seconds=elapsed_time)}: {input_file}"
239238
)
240239
container.stop()
241240
exception = DockerContainerTimeoutError(
242241
container_id=container.id, timeout=timeout
243242
)
243+
write_log_file(
244+
run_directory,
245+
input_file,
246+
container,
247+
extra_messages=[
248+
f"Transmogrifier container ({container.short_id}) TIMED OUT"
249+
],
250+
)
244251
break
245252

246253
except Exception as e:
247254
exception = e # type: ignore[assignment]
248-
logger.exception("Unhandled exception while waiting for container to complete.")
255+
logger.exception(
256+
f"Transmogrifier container ({container.short_id}) UNHANDLED EXCEPTION: {input_file}" # noqa: E501
257+
)
249258

250259
return container, exception
251260

252261

253-
def collect_container_results(
254-
futures: list[Future],
255-
) -> tuple[list[Container], list[Exception]]:
262+
def collect_container_results(futures: list[Future]) -> list[Exception]:
256263
"""Collect results of container executions.
257264
258265
Each future will contain a tuple of (Container, Exception) where the exception may
@@ -261,39 +268,51 @@ def collect_container_results(
261268
262269
Returns a tuple of (Containers (success), Exceptions (failure)) from all executions.
263270
"""
264-
containers = []
265271
exceptions = []
266272
for future in futures:
267273
container, exception = future.result()
268-
containers.append(container)
269274

270275
if exception:
271276
exceptions.append(exception)
272277
if container.attrs["State"]["ExitCode"] != 0:
273278
exceptions.append(DockerContainerRuntimeError(container.id))
274279

275280
logger.info(
276-
f"Container results collected: {len(containers) - len(exceptions)} successes, "
281+
f"Container results collected: {len(futures) - len(exceptions)} successes, "
277282
f"{len(exceptions)} failures"
278283
)
279-
return containers, exceptions
280-
281-
282-
def aggregate_logs(run_directory: str, containers: list[Container]) -> str:
283-
"""Retrieve logs for containers in a list, aggregating to a single log file."""
284-
log_file = str(Path(run_directory) / "transformed/logs.txt")
285-
with open(log_file, "w") as file:
286-
for container in containers:
287-
file.write(f"container: {container.id}\n")
288-
descriptors = (
289-
f"docker_image: {container.labels['docker_image']} | "
290-
f"source: {container.labels['source']} | "
291-
f"input_file: {container.labels['input_file']}\n"
292-
)
293-
file.write(descriptors)
294-
file.write(container.logs().decode())
295-
file.write("\n\n")
296-
return log_file
284+
return exceptions
285+
286+
287+
def write_log_file(
288+
run_directory: str,
289+
input_file: str,
290+
container: Container,
291+
extra_messages: list[str] | None = None,
292+
) -> None:
293+
"""Write logs for a given container to a text file."""
294+
filename_details = parse_timdex_filename(input_file)
295+
log_filename = "{source}-{run_date}-{run_type}-{container_id}-logs.txt".format(
296+
source=filename_details["source"],
297+
run_date=filename_details["run-date"],
298+
run_type=filename_details["run-type"],
299+
container_id=container.short_id,
300+
)
301+
log_filepath = str(Path(run_directory) / "logs" / log_filename)
302+
header = (
303+
f"docker_image: {container.labels['docker_image']} | "
304+
f"source: {container.labels['source']} | "
305+
f"input_file: {container.labels['input_file']}"
306+
)
307+
container_desc = f"container: {container.id}"
308+
with open(log_filepath, "w") as file:
309+
file.write(header + "\n")
310+
file.write(container_desc + "\n")
311+
for log in container.logs(stream=True):
312+
file.write(log.decode())
313+
314+
if extra_messages:
315+
file.write("\n".join(extra_messages))
297316

298317

299318
def get_transformed_files(run_directory: str) -> tuple[list[str], ...]:

tests/conftest.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@
3232
class Container:
3333
"""Stub for docker.models.container.Container object."""
3434

35-
def __init__(self, id, labels, attrs: dict | None = None): # noqa: A002
35+
def __init__(self, id, short_id, labels, attrs: dict | None = None): # noqa: A002
3636
self.id = id
37+
self.short_id = short_id
3738
self.labels = labels
3839
self.status = "created"
3940
self.run_duration = 0.5
@@ -56,8 +57,10 @@ def reload(self):
5657
if time.time() - self.start_time >= self.run_duration:
5758
self.status = "exited"
5859

59-
def logs(self):
60+
def logs(self, *, stream: bool = True):
6061
with open("tests/fixtures/transmogrifier-logs.txt", "rb") as file:
62+
if stream:
63+
yield from file
6164
return file.read()
6265

6366
def stop(self):
@@ -110,6 +113,7 @@ def create_transformed_files(
110113
if self.errors:
111114
return Container(
112115
id=container_id,
116+
short_id=container_id[:3],
113117
labels={
114118
"docker_image": image_name,
115119
"source": "source",
@@ -119,6 +123,7 @@ def create_transformed_files(
119123
)
120124
return Container(
121125
id=container_id,
126+
short_id=container_id[:3],
122127
labels={
123128
"docker_image": image_name,
124129
"source": "source",
@@ -277,6 +282,7 @@ def mocked_docker_container_and_image(
277282
def mocked_docker_container_a():
278283
return Container(
279284
id="abc123",
285+
short_id="abc",
280286
labels={
281287
"docker_image": "transmogrifier-example-job-1-abc123:latest",
282288
"source": "source",
@@ -289,6 +295,7 @@ def mocked_docker_container_a():
289295
def mocked_docker_container_b():
290296
return Container(
291297
id="def456",
298+
short_id="def",
292299
labels={
293300
"docker_image": "transmogrifier-example-job-1-def456:latest",
294301
"source": "source",

0 commit comments

Comments
 (0)