Skip to content

Commit 317ec57

Browse files
authored
Merge pull request #40 from SciCatProject/system-helper
Refactoring background ingestor.
2 parents 963eb32 + 2e4c889 commit 317ec57

9 files changed

+253
-125
lines changed

README.md

+37-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,45 @@
22

33
# Scicat Filewriter Ingest
44

5-
## About
6-
75
A daemon that creates a raw dataset using scicat interface whenever a new file is written by a file-writer.
86

7+
## How to INSTALL
8+
```bash
9+
git clone https://github.com/SciCatProject/scicat-filewriter-ingest.git
10+
cd scicat-filewriter-ingest
11+
pip install -e . # It will allow you to use entry-points of the scripts,
12+
# defined in ``pyproject.toml``, under ``[project.scripts]`` section.
13+
```
14+
15+
## How to RUN
16+
17+
All scripts parse the system arguments and configuration in the same way.
18+
19+
### Online ingestor (Highest level interface)
20+
You can start the ingestor daemon with certain configurations.
21+
22+
It will continuously process `wrdn` messages and ingest the nexus files.
23+
24+
```bash
25+
scicat_ingestor --verbose -c PATH_TO_CONFIGURATION_FILE.yaml
26+
```
27+
28+
See [configuration](#configuration) for how to use configuration files.
29+
30+
### Background ingestor (Lower level interface)
31+
You can also run the ingestor file by file.
32+
33+
You need to know the path to the nexus file you want to ingest
34+
and also the path to the ``done_writing_message_file`` as a json file.
35+
36+
```bash
37+
background_ingestor \\
38+
--verbose \\
39+
-c PATH_TO_CONFIGURATION_FILE.yaml \\
40+
--nexus-file PATH_TO_THE_NEXUS_FILE.nxs \\
41+
--done-writing-message-file PATH_TO_THE_MESSAGE_FILE.json
42+
```
43+
944
## Configuration
1045

1146
You can use a json file to configure options.

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ dynamic = ["version"]
4545

4646
[project.scripts]
4747
scicat_ingestor = "scicat_ingestor:main"
48+
background_ingestor = "background_ingestor:main"
4849

4950
[project.entry-points."scicat_ingestor.metadata_extractor"]
5051
max = "numpy:max"

src/background-ingestor.py src/background_ingestor.py

+7-34
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,22 @@
11
# SPDX-License-Identifier: BSD-3-Clause
22
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
3+
# import scippnexus as snx
34
import json
4-
import logging
55
import pathlib
6-
from collections.abc import Generator
7-
from contextlib import contextmanager
86

97
from scicat_configuration import (
108
build_background_ingestor_arg_parser,
11-
build_scicat_config,
9+
build_scicat_background_ingester_config,
1210
)
1311
from scicat_logging import build_logger
14-
15-
# import scippnexus as snx
16-
17-
18-
def quit(logger: logging.Logger, unexpected: bool = True) -> None:
19-
"""Log the message and exit the program."""
20-
import sys
21-
22-
logger.info("Exiting ingestor")
23-
sys.exit(1 if unexpected else 0)
24-
25-
26-
@contextmanager
27-
def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]:
28-
"""Exit the program if an exception is raised."""
29-
try:
30-
yield
31-
except KeyboardInterrupt:
32-
logger.info("Received keyboard interrupt.")
33-
quit(logger, unexpected=False)
34-
except Exception as e:
35-
logger.error("An exception occurred: %s", e)
36-
quit(logger, unexpected=True)
37-
else:
38-
logger.error("Loop finished unexpectedly.")
39-
quit(logger, unexpected=True)
12+
from system_helpers import exit_at_exceptions
4013

4114

4215
def main() -> None:
4316
"""Main entry point of the app."""
4417
arg_parser = build_background_ingestor_arg_parser()
4518
arg_namespace = arg_parser.parse_args()
46-
config = build_scicat_config(arg_namespace)
19+
config = build_scicat_background_ingester_config(arg_namespace)
4720
logger = build_logger(config)
4821

4922
# Log the configuration as dictionary so that it is easier to read from the logs
@@ -52,13 +25,13 @@ def main() -> None:
5225
)
5326
logger.info(config.to_dict())
5427

55-
with exit_at_exceptions(logger):
56-
nexus_file = arg_namespace.nexus_file
28+
with exit_at_exceptions(logger, daemon=False):
29+
nexus_file = pathlib.Path(config.single_run_options.nexus_file)
5730
logger.info("Nexus file to be ingested : ")
5831
logger.info(nexus_file)
5932

6033
done_writing_message_file = pathlib.Path(
61-
arg_namespace.arg_namespace.done_writing_message_file
34+
config.single_run_options.done_writing_message_file
6235
)
6336
logger.info("Done writing message file linked to nexus file : ")
6437
logger.info(done_writing_message_file)

src/scicat_configuration.py

+115-36
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,58 @@
22
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
33
import argparse
44
from collections.abc import Mapping
5-
from dataclasses import dataclass
5+
from dataclasses import asdict, dataclass
6+
from types import MappingProxyType
7+
from typing import Any
8+
9+
10+
def _load_config(config_file: Any) -> dict:
11+
"""Load configuration from the configuration file path."""
12+
import json
13+
import pathlib
14+
15+
if (
16+
isinstance(config_file, str | pathlib.Path)
17+
and (config_file_path := pathlib.Path(config_file)).is_file()
18+
):
19+
return json.loads(config_file_path.read_text())
20+
return {}
21+
22+
23+
def _merge_run_options(config_dict: dict, input_args_dict: dict) -> dict:
24+
"""Merge configuration from the configuration file and input arguments."""
25+
import copy
26+
27+
# Overwrite deep-copied options with command line arguments
28+
run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {}))
29+
for arg_name, arg_value in input_args_dict.items():
30+
if arg_value is not None:
31+
run_option_dict[arg_name] = arg_value
32+
33+
return run_option_dict
34+
35+
36+
def _freeze_dict_items(d: dict) -> MappingProxyType:
37+
"""Freeze the dictionary to make it read-only."""
38+
return MappingProxyType(
39+
{
40+
key: MappingProxyType(value) if isinstance(value, dict) else value
41+
for key, value in d.items()
42+
}
43+
)
44+
45+
46+
def _recursive_deepcopy(obj: Any) -> dict:
47+
"""Recursively deep copy a dictionary."""
48+
if not isinstance(obj, dict | MappingProxyType):
49+
return obj
50+
51+
copied = dict(obj)
52+
for key, value in copied.items():
53+
if isinstance(value, Mapping | MappingProxyType):
54+
copied[key] = _recursive_deepcopy(value)
55+
56+
return copied
657

758

859
def build_main_arg_parser() -> argparse.ArgumentParser:
@@ -96,7 +147,6 @@ def build_main_arg_parser() -> argparse.ArgumentParser:
96147

97148
def build_background_ingestor_arg_parser() -> argparse.ArgumentParser:
98149
parser = build_main_arg_parser()
99-
100150
group = parser.add_argument_group('Scicat Background Ingestor Options')
101151

102152
group.add_argument(
@@ -180,7 +230,7 @@ class kafkaOptions:
180230

181231

182232
@dataclass
183-
class ScicatConfig:
233+
class IngesterConfig:
184234
original_dict: Mapping
185235
"""Original configuration dictionary in the json file."""
186236
run_options: RunOptions
@@ -192,50 +242,79 @@ class ScicatConfig:
192242

193243
def to_dict(self) -> dict:
194244
"""Return the configuration as a dictionary."""
195-
from dataclasses import asdict
196-
197-
# Deep copy the original dictionary recursively
198-
original_dict = dict(self.original_dict)
199-
for key, value in original_dict.items():
200-
if isinstance(value, Mapping):
201-
original_dict[key] = dict(value)
202245

203-
copied = ScicatConfig(
204-
original_dict, self.run_options, self.kafka_options, self.graylog_options
246+
return asdict(
247+
IngesterConfig(
248+
_recursive_deepcopy(
249+
self.original_dict
250+
), # asdict does not support MappingProxyType
251+
self.run_options,
252+
self.kafka_options,
253+
self.graylog_options,
254+
)
205255
)
206-
return asdict(copied)
207256

208257

209-
def build_scicat_config(input_args: argparse.Namespace) -> ScicatConfig:
258+
def build_scicat_ingester_config(input_args: argparse.Namespace) -> IngesterConfig:
210259
"""Merge configuration from the configuration file and input arguments."""
211-
import copy
212-
import json
213-
import pathlib
214-
from types import MappingProxyType
260+
config_dict = _load_config(input_args.config_file)
261+
run_option_dict = _merge_run_options(config_dict, vars(input_args))
215262

216-
# Read configuration file
217-
if (
218-
input_args.config_file
219-
and (config_file_path := pathlib.Path(input_args.config_file)).is_file()
220-
):
221-
config_dict = json.loads(config_file_path.read_text())
222-
else:
223-
config_dict = {}
263+
# Wrap configuration in a dataclass
264+
return IngesterConfig(
265+
original_dict=_freeze_dict_items(config_dict),
266+
run_options=RunOptions(**run_option_dict),
267+
kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})),
268+
graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})),
269+
)
270+
271+
272+
@dataclass
273+
class SingleRunOptions:
274+
nexus_file: str
275+
"""Full path of the input nexus file to be ingested."""
276+
done_writing_message_file: str
277+
"""Full path of the done writing message file that match the ``nexus_file``."""
224278

225-
# Overwrite deep-copied options with command line arguments
226-
run_option_dict: dict = copy.deepcopy(config_dict.setdefault("options", {}))
227-
for arg_name, arg_value in vars(input_args).items():
228-
if arg_value is not None:
229-
run_option_dict[arg_name] = arg_value
230279

231-
# Protect original configuration by making it read-only
232-
for key, value in config_dict.items():
233-
config_dict[key] = MappingProxyType(value)
280+
@dataclass
281+
class BackgroundIngestorConfig(IngesterConfig):
282+
single_run_options: SingleRunOptions
283+
"""Single run configuration options for background ingestor."""
284+
285+
def to_dict(self) -> dict:
286+
"""Return the configuration as a dictionary."""
287+
288+
return asdict(
289+
BackgroundIngestorConfig(
290+
_recursive_deepcopy(
291+
self.original_dict
292+
), # asdict does not support MappingProxyType
293+
self.run_options,
294+
self.kafka_options,
295+
self.graylog_options,
296+
self.single_run_options,
297+
)
298+
)
299+
300+
301+
def build_scicat_background_ingester_config(
302+
input_args: argparse.Namespace,
303+
) -> BackgroundIngestorConfig:
304+
"""Merge configuration from the configuration file and input arguments."""
305+
config_dict = _load_config(input_args.config_file)
306+
input_args_dict = vars(input_args)
307+
single_run_option_dict = {
308+
"nexus_file": input_args_dict.pop("nexus_file"),
309+
"done_writing_message_file": input_args_dict.pop("done_writing_message_file"),
310+
}
311+
run_option_dict = _merge_run_options(config_dict, input_args_dict)
234312

235313
# Wrap configuration in a dataclass
236-
return ScicatConfig(
237-
original_dict=MappingProxyType(config_dict),
314+
return BackgroundIngestorConfig(
315+
original_dict=_freeze_dict_items(config_dict),
238316
run_options=RunOptions(**run_option_dict),
239317
kafka_options=kafkaOptions(**config_dict.setdefault("kafka", {})),
318+
single_run_options=SingleRunOptions(**single_run_option_dict),
240319
graylog_options=GraylogOptions(**config_dict.setdefault("graylog", {})),
241320
)

src/scicat_ingestor.py

+3-30
Original file line numberDiff line numberDiff line change
@@ -11,44 +11,17 @@
1111

1212
del importlib
1313

14-
import logging
15-
from collections.abc import Generator
16-
from contextlib import contextmanager
17-
18-
from scicat_configuration import build_main_arg_parser, build_scicat_config
14+
from scicat_configuration import build_main_arg_parser, build_scicat_ingester_config
1915
from scicat_kafka import build_consumer, wrdn_messages
2016
from scicat_logging import build_logger
21-
22-
23-
def quit(logger: logging.Logger, unexpected: bool = True) -> None:
24-
"""Log the message and exit the program."""
25-
import sys
26-
27-
logger.info("Exiting ingestor")
28-
sys.exit(1 if unexpected else 0)
29-
30-
31-
@contextmanager
32-
def exit_at_exceptions(logger: logging.Logger) -> Generator[None, None, None]:
33-
"""Exit the program if an exception is raised."""
34-
try:
35-
yield
36-
except KeyboardInterrupt:
37-
logger.info("Received keyboard interrupt.")
38-
quit(logger, unexpected=False)
39-
except Exception as e:
40-
logger.error("An exception occurred: %s", e)
41-
quit(logger, unexpected=True)
42-
else:
43-
logger.error("Loop finished unexpectedly.")
44-
quit(logger, unexpected=True)
17+
from system_helpers import exit_at_exceptions
4518

4619

4720
def main() -> None:
4821
"""Main entry point of the app."""
4922
arg_parser = build_main_arg_parser()
5023
arg_namespace = arg_parser.parse_args()
51-
config = build_scicat_config(arg_namespace)
24+
config = build_scicat_ingester_config(arg_namespace)
5225
logger = build_logger(config)
5326

5427
# Log the configuration as dictionary so that it is easier to read from the logs

src/scicat_logging.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import logging.handlers
66

77
import graypy
8-
from scicat_configuration import ScicatConfig
8+
from scicat_configuration import IngesterConfig
99

1010

11-
def build_logger(config: ScicatConfig) -> logging.Logger:
11+
def build_logger(config: IngesterConfig) -> logging.Logger:
1212
"""Build a logger and configure it according to the ``config``."""
1313
run_options = config.run_options
1414

0 commit comments

Comments
 (0)