Skip to content

Commit 67b56c4

Browse files
committed
feat: enhance job info handling and normalize file paths
- Added a new method to normalize job file paths based on the current execution directory. - Updated the `emit` method to handle `job_info` events, ensuring paths for input, output, log, and benchmark are correctly prefixed. - Refactored the effective work directory handling to improve path resolution and context management.
1 parent e8bb640 commit 67b56c4

2 files changed

Lines changed: 65 additions & 38 deletions

File tree

src/snakemake_logger_plugin_flowo/plugin/client/log_handler.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ def emit(self, record: LogRecord) -> None:
188188
configfiles, config = self._get_configfiles()
189189
self.context["configfiles"] = configfiles
190190
self._merge_workflow_config_context(config)
191-
self._set_effective_workdir_from_snakemake()
191+
elif event_name == "job_info":
192+
self._normalize_job_file_paths(data)
192193

193194
self._send_to_api(event_name, data)
194195
except Exception as e:
@@ -286,24 +287,41 @@ def _merge_workflow_config_context(self, config: dict) -> None:
286287
if tags:
287288
self.context["flowo_tags"] = tags
288289

289-
def _set_effective_workdir_from_snakemake(self) -> None:
290-
"""Use Snakemake's effective workdir, including ``--directory``."""
290+
def _current_execution_prefix(self) -> Path | None:
291+
"""Relative cwd prefix for paths emitted from Snakemake's effective workdir."""
291292
try:
292-
import snakemake.workflow
293+
workflow_root = Path(str(self.context.get("workdir") or "")).resolve()
294+
current = Path.cwd().resolve()
295+
if current == workflow_root:
296+
return None
297+
return current.relative_to(workflow_root)
298+
except ValueError:
299+
return None
300+
except Exception as e:
301+
logger.debug(f"Failed to derive snakemake execution prefix: {e}")
302+
return None
293303

294-
wf = getattr(snakemake.workflow, "workflow", None)
295-
if not wf:
296-
return
304+
def _normalize_job_file_paths(self, data: dict) -> None:
305+
prefix = self._current_execution_prefix()
306+
if prefix is None or str(prefix) == ".":
307+
return
297308

298-
workdir = (
299-
getattr(wf, "overwrite_workdir", None)
300-
or getattr(wf, "workdir_init", None)
301-
or getattr(wf, "_workdir_init", None)
302-
)
303-
if workdir:
304-
self.context["workdir"] = str(Path(workdir).resolve())
305-
except Exception as e:
306-
logger.debug(f"Failed to access snakemake workflow workdir: {e}")
309+
def normalize_one(raw: object) -> str:
310+
path = Path(str(raw))
311+
if path.is_absolute():
312+
return str(path)
313+
if path.parts[: len(prefix.parts)] == prefix.parts:
314+
return path.as_posix()
315+
return (prefix / path).as_posix()
316+
317+
for key in ("input", "output", "log", "benchmark"):
318+
value = data.get(key)
319+
if value is None:
320+
continue
321+
if isinstance(value, list):
322+
data[key] = [normalize_one(v) for v in value]
323+
else:
324+
data[key] = [normalize_one(value)]
307325

308326
def close(self) -> None:
309327
self.file_handler.close()

tests/snakemake_compat/test_log_handler_compat.py

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
import importlib.util
88
import inspect
99
import logging
10-
import sys
11-
from pathlib import Path
12-
from types import ModuleType, SimpleNamespace
10+
from types import SimpleNamespace
1311
from unittest.mock import MagicMock, patch
1412

1513
import pytest
@@ -542,19 +540,12 @@ def test_emit_workflow_started_preserves_cli_name_tags_when_config_is_empty():
542540
assert handler.context["flowo_tags"] == ["rna-seq", "demo"]
543541

544542

545-
def test_emit_workflow_started_records_snakemake_effective_workdir(
546-
tmp_path, monkeypatch
547-
):
548-
import snakemake
549-
543+
def test_emit_job_info_prefixes_paths_from_current_execution_dir(tmp_path, monkeypatch):
550544
from snakemake_logger_plugin_flowo.plugin.client.log_handler import FlowoLogHandler
551545

552-
effective_workdir = tmp_path / ".test"
553-
effective_workdir.mkdir()
554-
fake_workflow_module = ModuleType("snakemake.workflow")
555-
fake_workflow_module.workflow = SimpleNamespace(overwrite_workdir=effective_workdir)
556-
monkeypatch.setitem(sys.modules, "snakemake.workflow", fake_workflow_module)
557-
monkeypatch.setattr(snakemake, "workflow", fake_workflow_module, raising=False)
546+
workflow_root = tmp_path / "rna-seq-star-deseq2"
547+
execution_dir = workflow_root / ".test"
548+
execution_dir.mkdir(parents=True)
558549

559550
with (
560551
patch.object(FlowoLogHandler, "_init_file_handler", return_value=MagicMock()),
@@ -564,24 +555,42 @@ def test_emit_workflow_started_records_snakemake_effective_workdir(
564555
):
565556
mock_client_cls.return_value = MagicMock(is_closed=False)
566557
handler = FlowoLogHandler(_make_common_settings())
558+
handler.context["workdir"] = str(workflow_root)
567559
handler.file_handler.emit = MagicMock()
568560
handler._parsers = {
569-
"workflow_started": lambda record: SimpleNamespace(
561+
"job_info": lambda record: SimpleNamespace(
570562
model_dump=lambda mode="json": {
571-
"workflow_id": "wf-1",
572-
"snakefile": "Snakefile",
573-
"rules": [],
563+
"job_id": 1,
564+
"rule_name": "deseq2",
565+
"threads": 1,
566+
"input": ["data/in.tsv"],
567+
"output": [".test/results/out.tsv"],
568+
"log": ["logs/deseq2/init.log"],
569+
"benchmark": None,
574570
}
575571
)
576572
}
577573

578574
with (
579-
patch.object(handler, "_get_configfiles", return_value=([], {})),
580-
patch.object(handler, "_send_to_api"),
575+
monkeypatch.context() as m,
576+
patch.object(handler, "_send_to_api") as send_to_api,
581577
):
582-
handler.emit(SimpleNamespace(event="workflow_started"))
578+
m.chdir(execution_dir)
579+
handler.emit(SimpleNamespace(event="job_info"))
583580

584-
assert handler.context["workdir"] == str(Path(effective_workdir).resolve())
581+
send_to_api.assert_called_once_with(
582+
"job_info",
583+
{
584+
"job_id": 1,
585+
"rule_name": "deseq2",
586+
"threads": 1,
587+
"input": [".test/data/in.tsv"],
588+
"output": [".test/results/out.tsv"],
589+
"log": [".test/logs/deseq2/init.log"],
590+
"benchmark": None,
591+
},
592+
)
593+
assert handler.context["workdir"] == str(workflow_root)
585594

586595

587596
def test_emit_workflow_started_uses_non_empty_config_name_tags():

0 commit comments

Comments
 (0)