Skip to content

Commit c273f29

Browse files
authored
fix: Add log retrieval after job finalization (#61)
This PR adds log retrieval for finished Google Batch jobs. Log tailing is not available via the Cloud Logging API, so we can only retrieve logs all at once. A simple solution is to retrieve them only after the job has finished. There was a discussion in #14 about how the API only allows retrieving a single line per request and 60 requests per minute. However, I have discovered that we can retrieve 1,000 lines (stdout/stderr) per request, so 60,000 lines per minute. In this implementation, if a request fails, it retries after a 60-second delay. If it fails again, the logfile is left as-is - remains incomplete. This implementation, while not perfect, improves the current situation, where an empty `.snakemake/googlebatch_logs/` directory is created. Downloading these logs is highly beneficial, as browsing them in the Google Cloud Console's logging utility is highly painful. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Automated log collection for completed jobs is now available. This update ensures that logs from both successful and failed jobs are reliably captured with built-in handling of temporary service limitations to improve overall stability. - **Bug Fixes** - Improved error handling for log retrieval to manage service request limitations effectively. - **Tests** - Enhanced mock functionality in tests to accommodate new attributes and methods, improving test coverage and reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 3a37591 commit c273f29

File tree

3 files changed

+70
-7
lines changed

3 files changed

+70
-7
lines changed

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ google-cloud-storage = "^2.12.0"
2222
snakemake-interface-common = "^1.14.0"
2323
snakemake-interface-executor-plugins = "^9.0.0"
2424
jinja2 = "^3.1.2"
25-
google-cloud-logging = "^3.8.0"
25+
google-cloud-logging = "^3.11.4"
2626

2727
[tool.poetry.group.dev.dependencies]
2828
black = "^24.4.0"
2929
flake8 = "^6.1.0"
3030
coverage = "^7.3.1"
3131
pytest = "^7.4.2"
32-
snakemake = "^8.18.0"
33-
snakemake-storage-plugin-s3 = "^0.2.10"
32+
snakemake = "^8.30.0"
33+
snakemake-storage-plugin-s3 = "^0.3.1"
3434

3535
[tool.coverage.run]
3636
omit = [".*", "*/site-packages/*", "Snakefile"]

snakemake_executor_plugin_googlebatch/executor.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import time
23
import uuid
34

45
from typing import List
@@ -11,8 +12,8 @@
1112
import snakemake_executor_plugin_googlebatch.utils as utils
1213
import snakemake_executor_plugin_googlebatch.command as cmdutil
1314

14-
from google.api_core.exceptions import DeadlineExceeded
15-
from google.cloud import batch_v1
15+
from google.api_core.exceptions import DeadlineExceeded, ResourceExhausted
16+
from google.cloud import batch_v1, logging
1617

1718

1819
class GoogleBatchExecutor(RemoteExecutor):
@@ -508,6 +509,9 @@ async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]):
508509
# SUCCEEDED
509510
# FAILED
510511
# DELETION_IN_PROGRESS
512+
if response.status.state.name in ["FAILED", "SUCCEEDED"]:
513+
self.save_finished_job_logs(j)
514+
511515
if response.status.state.name == "FAILED":
512516
msg = f"Google Batch job '{j.external_jobid}' failed. "
513517
self.report_job_error(j, msg=msg, aux_logs=aux_logs)
@@ -519,9 +523,61 @@ async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]):
519523
else:
520524
yield j
521525

526+
def save_finished_job_logs(
527+
self,
528+
job_info: SubmittedJobInfo,
529+
sleeps=60,
530+
page_size=1000,
531+
):
532+
"""
533+
Download logs using Google Cloud Logging API and save
534+
them locally. Since tail logging does not work, this function
535+
is run only at the end of the job.
536+
"""
537+
job_uid = job_info.aux["batch_job"].uid
538+
filter_query = f"labels.job_uid={job_uid}"
539+
logfname = job_info.aux["logfile"]
540+
541+
log_client = logging.Client(project=self.executor_settings.project)
542+
logger = log_client.logger("batch_task_logs")
543+
544+
def attempt_log_save(fname, logger, query, page_size):
545+
with open(fname, "w", encoding="utf-8") as logfile:
546+
for log_entry in logger.list_entries(
547+
filter_=query,
548+
page_size=page_size,
549+
):
550+
logfile.write(str(log_entry.payload) + "\n")
551+
552+
self.logger.info(f"Saving logs for Batch job {job_uid} to {logfname}.")
553+
554+
try:
555+
attempt_log_save(logfname, logger, filter_query, page_size)
556+
except ResourceExhausted:
557+
self.logger.warning(
558+
"Too many requests to Google Logging API.\n"
559+
+ f"Skipping logs for job {job_uid} and sleeping for {sleeps}s."
560+
)
561+
time.sleep(sleeps)
562+
563+
self.logger.warning(
564+
f"Trying to retrieve logs for Batch job {job_uid} once more."
565+
)
566+
try:
567+
attempt_log_save(logfname, logger, filter_query, page_size)
568+
except ResourceExhausted:
569+
self.logger.warning(
570+
"Retry to retrieve logs failed, "
571+
+ f"the log file {logfname} might be incomplete."
572+
)
573+
except Exception as e:
574+
self.logger.warning(
575+
f"Failed to retrieve logs for Batch job {job_uid}: {str(e)}"
576+
)
577+
522578
def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
523579
"""
524-
cancel all active jobs. This method is called when snakemake is interrupted.
580+
Cancel all active jobs. This method is called when snakemake is interrupted.
525581
"""
526582
for job in active_jobs:
527583
jobid = job.external_jobid

tests/tests_mocked_api.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class TestWorkflowsMockedApi(TestWorkflowsBase):
1111
@patch(
1212
"google.cloud.batch_v1.BatchServiceClient.create_job",
1313
new=MagicMock(
14-
return_value=Job(name="foo"),
14+
return_value=Job(name="foo", uid="bar"),
1515
autospec=True,
1616
),
1717
)
@@ -22,6 +22,13 @@ class TestWorkflowsMockedApi(TestWorkflowsBase):
2222
autospec=True,
2323
),
2424
)
25+
@patch(
26+
"google.cloud.logging.Client.logger",
27+
new=MagicMock(
28+
return_value=MagicMock(list_entries=lambda filter_, page_size: []),
29+
autospec=True,
30+
),
31+
)
2532
@patch(
2633
"snakemake.dag.DAG.check_and_touch_output",
2734
new=AsyncMock(autospec=True),

0 commit comments

Comments
 (0)