Skip to content

Commit c48849e

Browse files
committed
feat: add input cache parameter
1 parent 103f0a4 commit c48849e

2 files changed

Lines changed: 79 additions & 19 deletions

File tree

README.md

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,27 @@ Following environment variables could be used for the configuration:
5050
|`DIAL_LOG_PARSER_DATE`| optional | Date to process logs for (default: yesterday) |
5151
|`DIAL_LOG_PARSER_DEBUG`| optional | Enables debug logging |
5252
|`DIAL_LOG_PARSER_FILENAME_REGEX`| optional | Allows to override the regex to match log file names (default: `date=(\d{4}-\d{2}-\d{2})(\d+)-(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).log(.gz)?`) |
53-
|`DIAL_LOG_PARSER_INPUT_COMPRESSION`| optional | Compression type for input log files. Possible values: 'detect' - detect compression from file extension (default), 'none' - no compression, or well known compression types [supported by pyarrow](https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.open_input_stream) (like 'gzip'). |
53+
|`DIAL_LOG_PARSER_INPUT_COMPRESSION`| optional | Compression type for input log files. Possible values: <br/> `infer` - infer compression from file extension (default), <br/> `none` - no compression, <br/> or well known compression types [supported by fsspec](https://filesystem-spec.readthedocs.io/en/latest/features.html#transparent-text-mode-and-compression) (like `gzip`). |
54+
|`DIAL_LOG_PARSER_INPUT_CACHE`| optional | Cache type for input filesystem. Possible values: <br/> `default` - use default caching behavior (default), <br/> `none` - disable caching, <br/> or cache types supported by fsspec (like `readahead`, `bytes`, etc.). <br/> See https://filesystem-spec.readthedocs.io/en/latest/api.html#read-buffering and specific filesystem documentation for details. |
5455

5556
### Storage specific environment variables
5657

5758
Specific storage implementations may require additional environment variables to be set.
5859

59-
For example, for S3, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY may be required. See https://s3fs.readthedocs.io/en/latest/#credentials
60+
For example, for S3, `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` may be required. See https://s3fs.readthedocs.io/en/latest/#credentials
6061

6162
Fsspec compatible implementations should be supported (may require to install the extra packages to the docker).
6263
Check the list [Built-in Fsspec Implementations](https://filesystem-spec.readthedocs.io/en/latest/api.html#implementations) and [Other Known Fsspec Implementations](https://filesystem-spec.readthedocs.io/en/latest/api.html#external-implementations) for more details.
6364

65+
#### Azure Blob Storage
66+
67+
For Azure Blob Storage, see [adlfs documentation](https://github.com/fsspec/adlfs?tab=readme-ov-file#setting-credentials) for the list of required environment variables.
68+
69+
**Note**: `AZURE_STORAGE_ANON` should be explicitly set to `false` to use authenticated access. The default value in the adlfs library is `true` which may lead to authentication issues when trying to access private blobs.
70+
71+
If you store the logs compressed as `.logs.gz` and the `Content-Encoding` header for the blob is set to `gzip`, you may encounter an issue where adlfs returns decompressed file content, but reports the file size for the compressed file. This confuses the caching and decompression logic in fsspec and may lead to an error when the parser tries to read the file content.
72+
73+
To work around this issue, you can set the `DIAL_LOG_PARSER_INPUT_COMPRESSION=none` to explicitly disable compression in the parser even if the file name ends with `.gz`, and set `DIAL_LOG_PARSER_INPUT_CACHE=none` to disable caching to avoid issues with the file size mismatch. This way the parser will read the file content as is without trying to decompress it or cache it.
6474

6575
### Command-line arguments
6676
```
@@ -77,10 +87,17 @@ Options:
7787
[env var: DIAL_LOG_PARSER_FILENAME_REGEX; default: date=(\d{4}-\d{2}-\d{2})(\d+)-(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}).log(.gz)?]
7888
--input-compression TEXT Compression type for input log files.
7989
Possible values:
80-
'detect' - detect compression from file extension (default),
90+
'infer' - infer compression from file extension (default),
8191
'none' - no compression,
82-
or well known compression types supported by pyarrow (like 'gzip').
83-
[env var: DIAL_LOG_PARSER_INPUT_COMPRESSION; default: detect]
92+
or well known compression types supported by fsspec (like 'gzip').
93+
[env var: DIAL_LOG_PARSER_INPUT_COMPRESSION; default: infer]
94+
--input-cache TEXT Cache type for input filesystem. Possible values:
95+
'default' - use default caching behavior (default),
96+
'none' - disable caching,
97+
or cache types supported by fsspec (like 'readahead', 'bytes', etc.).
98+
See https://filesystem-spec.readthedocs.io/en/latest/api.html#read-buffering and specific filesystem documentation
99+
for details.
100+
[env var: DIAL_LOG_PARSER_INPUT_CACHE; default: default]
84101
--help Show this message and exit.
85102
```
86103

src/aidial_log_parser/parse_logs.py

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@
2727

2828
DEFAULT_FILENAME_REGEX_COMPILED = re.compile(DEFAULT_FILENAME_REGEX)
2929

30-
DEFAULT_INPUT_COMPRESSION = "detect"
30+
DEFAULT_INPUT_COMPRESSION = "infer"
31+
32+
DEFAULT_READ_KWARGS = {
33+
"compression": DEFAULT_INPUT_COMPRESSION,
34+
}
3135

3236
DEPLOYMENT_FIELD_NAME = "deployment"
3337

@@ -155,8 +159,8 @@ def from_file_path(
155159

156160
def read_data(
157161
input_files: list[InputFile],
158-
filesystem,
159-
compression: str | None = DEFAULT_INPUT_COMPRESSION,
162+
filesystem: fsspec.AbstractFileSystem,
163+
**kwargs,
160164
):
161165
read_options = pj.ReadOptions(
162166
use_threads=False,
@@ -165,17 +169,15 @@ def read_data(
165169
with click.progressbar(input_files, label="Reading files", show_pos=True) as files:
166170
for input_file in files:
167171
logging.info(f"Reading file {input_file.path}")
168-
with filesystem.open_input_stream(
169-
input_file.path, compression=compression
170-
) as f:
172+
with filesystem.open(input_file.path, mode="rb", **kwargs) as f:
171173
table = pj.read_json(f, read_options)
172174
for batch in table.to_batches():
173175
yield input_file.date, batch
174176

175177

176178
def list_files(
177179
input_dir: str,
178-
filesystem,
180+
filesystem: fs.PyFileSystem,
179181
log_date: pa.Date32Scalar,
180182
filename_regex: re.Pattern,
181183
) -> list[InputFile]:
@@ -389,7 +391,7 @@ def parse_logs(
389391
output_dir: str,
390392
date: pa.Date32Scalar,
391393
filename_regex: re.Pattern = DEFAULT_FILENAME_REGEX_COMPILED,
392-
input_compression: str | None = DEFAULT_INPUT_COMPRESSION,
394+
read_kwargs: dict = DEFAULT_READ_KWARGS,
393395
):
394396
in_fs_fsspec, input_dir_path = fsspec.url_to_fs(input_dir)
395397
in_fs = fs.PyFileSystem(fs.FSSpecHandler(in_fs_fsspec))
@@ -407,8 +409,8 @@ def parse_logs(
407409

408410
input_batches_iter = read_data(
409411
input_files,
410-
filesystem=in_fs,
411-
compression=input_compression,
412+
filesystem=in_fs_fsspec,
413+
**read_kwargs,
412414
)
413415
logging.debug(f"Output schema: {OUT_SCHEMA}")
414416

@@ -436,6 +438,21 @@ def parse_compression_param(
436438
return value
437439

438440

441+
def parse_cache_param(
442+
ctx: click.Context,
443+
param: click.Parameter,
444+
value: str,
445+
) -> dict:
446+
# Dict is used to distinguish between default and None value
447+
# Setting cache_type to None disables caching in fsspec
448+
# But not passing cache_type uses the default cache_type parameter
449+
# specific to the filesystem implementation
450+
if value.lower() == "default":
451+
return {}
452+
453+
return {"cache_type": value}
454+
455+
439456
@click.command()
440457
@click.option(
441458
"-i",
@@ -482,16 +499,39 @@ def parse_compression_param(
482499
"--input-compression",
483500
type=str,
484501
help="""Compression type for input log files. Possible values:
485-
'detect' - detect compression from file extension (default),
502+
'infer' - infer compression from file extension (default),
486503
'none' - no compression,
487-
or well known compression types supported by pyarrow (like 'gzip').
504+
or well known compression types supported by fsspec (like 'gzip').
488505
""",
489506
callback=parse_compression_param,
490507
default=DEFAULT_INPUT_COMPRESSION,
491508
show_default=True,
492509
show_envvar=True,
493510
)
494-
def main(input, output, date, debug, filename_regex, input_compression):
511+
@click.option(
512+
"--input-cache",
513+
type=str,
514+
help="""Cache type for input filesystem. Possible values:
515+
'default' - use default caching behavior (default),
516+
'none' - disable caching,
517+
or cache types supported by fsspec (like 'readahead', 'bytes', etc.).
518+
See https://filesystem-spec.readthedocs.io/en/latest/api.html#read-buffering
519+
and specific filesystem documentation for details.
520+
""",
521+
default="default",
522+
show_default=True,
523+
show_envvar=True,
524+
callback=parse_cache_param,
525+
)
526+
def main(
527+
input: str,
528+
output: str,
529+
date: datetime.datetime,
530+
debug: bool,
531+
filename_regex: str,
532+
input_compression: str,
533+
input_cache: dict,
534+
):
495535
"""Parse dial log files and repack it to parquet dataset."""
496536
if debug:
497537
logging.getLogger().setLevel(logging.DEBUG)
@@ -505,7 +545,10 @@ def main(input, output, date, debug, filename_regex, input_compression):
505545
output,
506546
pa.scalar(date, type=pa.date32()),
507547
filename_regex_compiled,
508-
input_compression,
548+
read_kwargs={
549+
"compression": input_compression,
550+
**input_cache,
551+
},
509552
)
510553

511554

0 commit comments

Comments
 (0)