Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Following environment variables could be used for the configuration:
|`DIAL_LOG_PARSER_DATE`| optional | Date to process logs for (default: yesterday) |
|`DIAL_LOG_PARSER_DEBUG`| optional | Enables debug logging |
|`DIAL_LOG_PARSER_FILENAME_REGEX`| optional | Allows to override the regex to match log file names (default: `date=(\d{4}-\d{2}-\d{2})(\d+)-(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).log(.gz)?`) |
|`DIAL_LOG_PARSER_INPUT_COMPRESSION`| optional | Compression type for input log files. Possible values: 'detect' - detect compression from file extension (default), 'none' - no compression, or well known compression types [supported by pyarrow](https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.open_input_file) (like 'gzip'). |
Comment thread
adubovik marked this conversation as resolved.
Outdated

### Storage specific environment variables

Expand All @@ -68,19 +69,19 @@ Usage: python -m aidial_log_parser.parse_logs [OPTIONS]
Parse dial log files and repack it to parquet dataset.

Options:
-i, --input TEXT Path to input log directory [env var:
DIAL_LOG_PARSER_INPUT; required]
-o, --output TEXT Path to output log directory [env var:
DIAL_LOG_PARSER_OUTPUT; required]
-d, --date [%Y-%m-%d] Date to process logs for [env var:
DIAL_LOG_PARSER_DATE; default: 2024-06-09]
--debug Enable debug logging [env var:
DIAL_LOG_PARSER_DEBUG]
--filename-regex TEXT Regex to match log file names [env var:
DIAL_LOG_PARSER_FILENAME_REGEX; default: date=(\d{4}-
\d{2}-\d{2})(\d+)-(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).lo
g(.gz)?]
--help Show this message and exit.
-i, --input TEXT Path to input log directory [env var: DIAL_LOG_PARSER_INPUT; required]
-o, --output TEXT Path to output log directory [env var: DIAL_LOG_PARSER_OUTPUT; required]
-d, --date [%Y-%m-%d] Date to process logs for [env var: DIAL_LOG_PARSER_DATE; default: 2026-02-02]
--debug Enable debug logging [env var: DIAL_LOG_PARSER_DEBUG]
--filename-regex TEXT Regex to match log file names
[env var: DIAL_LOG_PARSER_FILENAME_REGEX; default: date=(\d{4}-\d{2}-\d{2})(\d+)-(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).log(.gz)?]
--input-compression TEXT Compression type for input log files.
Possible values:
'detect' - detect compression from file extension (default),
'none' - no compression,
or well known compression types supported by pyarrow (like 'gzip').
[env var: DIAL_LOG_PARSER_INPUT_COMPRESSION; default: detect]
--help Show this message and exit.
```


Expand Down
50 changes: 45 additions & 5 deletions src/aidial_log_parser/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

DEFAULT_FILENAME_REGEX_COMPILED = re.compile(DEFAULT_FILENAME_REGEX)

DEFAULT_INPUT_COMPRESSION = "detect"

DEPLOYMENT_FIELD_NAME = "deployment"

OUT_PARTITIONING = ds.partitioning(
Expand Down Expand Up @@ -151,15 +153,21 @@ def from_file_path(
)


def read_data(input_files: list[InputFile], filesystem):
def read_data(
input_files: list[InputFile],
filesystem,
compression: str | None = DEFAULT_INPUT_COMPRESSION,
):
read_options = pj.ReadOptions(
use_threads=False,
block_size=64 << 20,
) # 64MiB, single field should not exceed 2 blocks
with click.progressbar(input_files, label="Reading files", show_pos=True) as files:
for input_file in files:
logging.info(f"Reading file {input_file.path}")
with filesystem.open_input_stream(input_file.path) as f:
with filesystem.open_input_stream(
input_file.path, compression=compression
) as f:
table = pj.read_json(f, read_options)
for batch in table.to_batches():
yield input_file.date, batch
Expand Down Expand Up @@ -381,6 +389,7 @@ def parse_logs(
output_dir: str,
date: pa.Date32Scalar,
filename_regex: re.Pattern = DEFAULT_FILENAME_REGEX_COMPILED,
input_compression: str | None = DEFAULT_INPUT_COMPRESSION,
):
in_fs_fsspec, input_dir_path = fsspec.url_to_fs(input_dir)
in_fs = fs.PyFileSystem(fs.FSSpecHandler(in_fs_fsspec))
Expand All @@ -396,7 +405,11 @@ def parse_logs(
if not out_fs.get_file_info(output_dir_path):
raise FileNotFoundError(f"Output directory {output_dir} does not exist")

input_batches_iter = read_data(input_files, filesystem=in_fs)
input_batches_iter = read_data(
input_files,
filesystem=in_fs,
compression=input_compression,
)
logging.debug(f"Output schema: {OUT_SCHEMA}")

output_batches_iter = process_batches(input_batches_iter, OUT_SCHEMA)
Expand All @@ -413,6 +426,16 @@ def parse_logs(
)


def parse_compression_param(
ctx: click.Context,
param: click.Parameter,
value: str,
) -> str | None:
if value.lower() == "none":
return None
return value


@click.command()
@click.option(
"-i",
Expand Down Expand Up @@ -455,7 +478,20 @@ def parse_logs(
show_default=True,
show_envvar=True,
)
def main(input, output, date, debug, filename_regex):
@click.option(
"--input-compression",
type=str,
help="""Compression type for input log files. Possible values:
'detect' - detect compression from file extension (default),
'none' - no compression,
or well known compression types supported by pyarrow (like 'gzip').
""",
callback=parse_compression_param,
default=DEFAULT_INPUT_COMPRESSION,
show_default=True,
show_envvar=True,
)
def main(input, output, date, debug, filename_regex, input_compression):
"""Parse dial log files and repack it to parquet dataset."""
if debug:
logging.getLogger().setLevel(logging.DEBUG)
Expand All @@ -465,7 +501,11 @@ def main(input, output, date, debug, filename_regex):
logging.info(f"Date: {date}")
filename_regex_compiled = re.compile(filename_regex)
parse_logs(
input, output, pa.scalar(date, type=pa.date32()), filename_regex_compiled
input,
output,
pa.scalar(date, type=pa.date32()),
filename_regex_compiled,
input_compression,
)


Expand Down
Loading