Skip to content

Commit 40f72cf

Browse files
authored
Merge pull request #235 from aodn/ProcessInputParquetHive
Feat: improve parquet parser to read hived parquet dataset
2 parents 1b40153 + 52df45f commit 40f72cf

File tree

4 files changed

+260
-78
lines changed

4 files changed

+260
-78
lines changed

aodn_cloud_optimised/bin/create_dataset_config.py

Lines changed: 78 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@
4343
import uuid
4444
from collections import OrderedDict
4545
from importlib.resources import files
46+
from urllib.parse import urlparse
4647

4748
import nbformat
4849
import pandas as pd
4950
import polars as pl
51+
import pyarrow.dataset as ds
5052
import pyarrow.parquet as pq
5153
import s3fs
5254
from s3path import PureS3Path
@@ -813,28 +815,54 @@ def main():
813815
regex_filter = [".*\\.csv$"]
814816

815817
case ".parquet":
816-
with fs.open(fp, "rb") as f:
817-
schema = pq.read_schema(f)
818-
dataset_config_schema = dict()
819-
820-
for field in schema:
821-
822-
# Extract core schema information
823-
dataset_config_schema[field.name] = {
824-
"type": str(field.type),
825-
"nullable": str(field.nullable),
826-
}
818+
dataset_config_schema = dict()
827819

828-
# Extract additional metadata if it exists
829-
if isinstance(field.metadata, dict):
830-
dataset_config_schema[field.name].update(
831-
{
832-
key.decode(): value.decode()
833-
for key, value in field.metadata.items()
834-
}
835-
)
820+
# TODO: at this stage, we don't know yet if it's a hive or single parquet file. Could add another option in the create_dataset_config script for parquet only.
821+
if fs.isfile(fp):
822+
# Try reading as a single Parquet file
823+
with fs.open(fp, "rb") as f:
824+
schema = pq.read_schema(f)
825+
826+
parquet_partitioning = None
827+
if fs.isdir(fp):
828+
# If that fails, assume it's a Hive-partitioned dataset
829+
830+
# Strip "s3://" if present, since s3fs expects only the key
831+
parsed = urlparse(fp)
832+
# TODO: this works but seems very ugly. Need to improve
833+
dataset_path = (
834+
f"{parsed.netloc}{parsed.path}" # ✅ keep the leading slash
835+
)
836+
parquet_partitioning = "hive"
837+
dataset = ds.dataset(
838+
dataset_path,
839+
format="parquet",
840+
partitioning=parquet_partitioning,
841+
filesystem=fs,
842+
)
843+
schema = dataset.schema
844+
845+
for field in schema:
846+
# Extract core schema information
847+
dataset_config_schema[field.name] = {
848+
"type": str(field.type),
849+
"nullable": str(field.nullable),
850+
}
851+
852+
# Extract additional metadata if it exists
853+
if isinstance(field.metadata, dict):
854+
dataset_config_schema[field.name].update(
855+
{
856+
key.decode(): value.decode()
857+
for key, value in field.metadata.items()
858+
}
859+
)
860+
case ".zarr":
861+
# TODO: implement a zarr reader
836862

837-
regex_filter = [".*\\.parquet$"]
863+
raise NotImplementedError(
864+
f"input file type `{obj_key_suffix}` not yet implemented"
865+
)
838866

839867
# Default: Raise NotImplemented
840868
case _:
@@ -881,10 +909,36 @@ def main():
881909
"mode": f"{TO_REPLACE_PLACEHOLDER}",
882910
"restart_every_path": False,
883911
}
884-
parent_s3_path = PureS3Path.from_uri(fp).parent.as_uri()
885-
dataset_config["run_settings"]["paths"] = [
886-
{"s3_uri": parent_s3_path, "filter": regex_filter, "year_range": []}
887-
]
912+
913+
match obj_key_suffix:
914+
case ".nc" | ".csv":
915+
parent_s3_path = PureS3Path.from_uri(fp).parent.as_uri()
916+
dataset_config["run_settings"]["paths"] = [
917+
{
918+
"type": "files",
919+
"s3_uri": parent_s3_path,
920+
"filter": regex_filter,
921+
"year_range": [],
922+
}
923+
]
924+
case ".zarr":
925+
# TODO: partially implemented
926+
parent_s3_path = PureS3Path.from_uri(fp).as_uri()
927+
dataset_config["run_settings"]["paths"] = [
928+
{
929+
"type": "zarr",
930+
"s3_uri": parent_s3_path,
931+
}
932+
]
933+
case ".parquet":
934+
parent_s3_path = PureS3Path.from_uri(fp).as_uri()
935+
dataset_config["run_settings"]["paths"] = [
936+
{
937+
"type": "parquet",
938+
"partitioning": parquet_partitioning,
939+
"s3_uri": parent_s3_path,
940+
}
941+
]
888942

889943
if args.s3fs_opts:
890944
dataset_config.setdefault("run_settings", {})["s3_bucket_opts"] = {

aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py

Lines changed: 140 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,22 @@ class PathConfig(BaseModel):
7272
7373
Attributes:
7474
s3_uri: S3 URI as a POSIX path string.
75-
filter: List of regex patterns to filter files.
76-
year_range: Year filter: None, one year, or a two-year inclusive range, or a list of exclusive years to process.
75+
type: Type of dataset. Can be "files", "parquet", or "zarr".
76+
partitioning: Optional, used only for Parquet datasets (e.g., "hive").
77+
filter: List of regex patterns to filter files (only valid for type="files").
78+
year_range: Optional Year filter: None, one year, or a two-year inclusive range, or a list of exclusive years to process. (only valid for type="files")
79+
7780
"""
7881

7982
s3_uri: str
83+
type: Optional[Literal["files", "parquet", "zarr"]] = Field(
84+
default=None,
85+
description="Dataset type. One of 'files', 'parquet', or 'zarr'. Defaults to 'files' if not specified.",
86+
)
87+
partitioning: Optional[str] = Field(
88+
default=None,
89+
description="Partitioning scheme, only valid when type='parquet'. Currently supports 'hive'.",
90+
)
8091
filter: List[str] = Field(
8192
default_factory=list,
8293
description="List of regular expression patterns used to filter matching files.",
@@ -125,10 +136,11 @@ def validate_s3_uri(cls, v: str) -> str:
125136
parsed = urlparse(v)
126137
if not parsed.netloc:
127138
raise ValueError("s3_uri must include a bucket name after 's3://'")
128-
if not parsed.path or parsed.path == "/":
129-
raise ValueError(
130-
"s3_uri must include a valid key path after the bucket"
131-
)
139+
# TODO: remove the commented lines below. This used to be a good test, but now dataset could be a parquet hive partitioned at the root of the bucket.
140+
# if not parsed.path or parsed.path == "/":
141+
# raise ValueError(
142+
# "s3_uri must include a valid key path after the bucket"
143+
# )
132144
try:
133145
PurePosixPath(parsed.path.lstrip("/"))
134146
except Exception as e:
@@ -151,6 +163,55 @@ def validate_regex(cls, v):
151163
raise ValueError(f"Invalid regex: {pattern} ({e})")
152164
return v
153165

166+
@model_validator(mode="after")
167+
def validate_cross_fields(cls, values):
168+
dataset_type = values.type or "files"
169+
if values.type is None:
170+
warnings.warn(
171+
"No 'type' specified in PathConfig. Assuming 'files' as default.",
172+
UserWarning,
173+
stacklevel=2,
174+
)
175+
values.type = "files"
176+
if (
177+
any(".parquet" in f for f in values.filter)
178+
or ".parquet" in values.s3_uri
179+
):
180+
raise ValueError(
181+
"type must be defined as 'parquet' in run_settings.paths config if ingesting a parquet dataset."
182+
)
183+
elif any(".zarr" in f for f in values.filter) or ".zarr" in values.s3_uri:
184+
raise ValueError(
185+
"type must be defined as 'zarr' in run_settings.paths config if ingesting a zarr dataset."
186+
)
187+
188+
if dataset_type == "parquet":
189+
if values.filter:
190+
raise ValueError("filter must not be defined when type='parquet'")
191+
if values.year_range:
192+
raise ValueError("year_range must not be defined when type='parquet'")
193+
if values.partitioning not in (None, "hive"):
194+
raise ValueError(
195+
f"Invalid partitioning='{values.partitioning}' for parquet dataset. Only 'hive' is supported."
196+
)
197+
198+
elif dataset_type == "zarr":
199+
if values.filter:
200+
raise ValueError("filter must not be defined when type='zarr'")
201+
if values.year_range:
202+
raise ValueError("year_range must not be defined when type='zarr'")
203+
if values.partitioning:
204+
raise ValueError("partitioning is not applicable when type='zarr'")
205+
206+
elif dataset_type == "files":
207+
if values.partitioning:
208+
raise ValueError("partitioning is not applicable when type='files'")
209+
210+
else:
211+
raise ValueError(f"Unsupported dataset type: {dataset_type}")
212+
213+
return values
214+
154215

155216
class WorkerOptions(BaseModel):
156217
"""Worker configuration for Coiled clusters.
@@ -1138,67 +1199,99 @@ def load_config_and_validate(config_filename: str) -> DatasetConfig:
11381199
return DatasetConfig.model_validate(dataset_config)
11391200

11401201

1141-
def json_update(base: dict, updates: dict) -> dict:
1142-
"""Recursively update nested dictionaries."""
1143-
for k, v in updates.items():
1144-
if isinstance(v, dict) and isinstance(base.get(k), dict):
1145-
base[k] = json_update(base[k], v)
1146-
else:
1147-
base[k] = v
1148-
return base
1149-
1150-
11511202
def collect_files(
11521203
path_cfg: PathConfig,
1153-
suffix: str,
1204+
suffix: Optional[str],
11541205
exclude: Optional[str],
11551206
bucket_raw: Optional[str],
11561207
s3_client_opts: Optional[dict] = None,
11571208
) -> List[str]:
1158-
"""Collect files from an S3 bucket using suffix and optional regex filtering.
1209+
"""Collect dataset paths from S3 based on dataset type.
1210+
1211+
Supports:
1212+
- 'files': lists and filters regular files (e.g., NetCDF, CSV)
1213+
- 'parquet': handles both single Parquet files and Hive-partitioned datasets
1214+
- 'zarr': returns the Zarr store path directly
11591215
11601216
Args:
1161-
path_cfg: Configuration object including the S3 URI and optional regex filters.
1217+
path_cfg: Configuration object including type, S3 URI, and optional regex filters.
11621218
suffix: File suffix to filter by, e.g., '.nc'. Set to None to disable suffix filtering.
11631219
exclude: Optional regex string to exclude files.
11641220
bucket_raw: Required if `path_cfg.s3_uri` is not a full S3 URI.
1221+
s3_client_opts: Optional dict with boto3 S3 client options.
11651222
11661223
Returns:
1167-
List of matching file keys (paths) as strings.
1224+
List of dataset paths (files or root URIs) as strings.
11681225
"""
1169-
s3_uri = path_cfg.s3_uri
1226+
dataset_type = getattr(path_cfg, "type", "files") # default value
1227+
s3_uri = path_cfg.s3_uri.rstrip("/")
1228+
1229+
# ---------------------------------------------------------------------
1230+
# Handle 'file' collection (NetCDF, CSV)
1231+
# ---------------------------------------------------------------------
1232+
if dataset_type == "files":
1233+
if s3_uri.startswith("s3://"):
1234+
parsed = urlparse(s3_uri)
1235+
bucket = parsed.netloc
1236+
prefix = parsed.path.lstrip("/")
1237+
else:
1238+
if not bucket_raw:
1239+
raise ValueError(
1240+
"bucket_raw must be provided when s3_uri is not a full S3 URI."
1241+
)
1242+
bucket = bucket_raw
1243+
prefix = s3_uri
11701244

1171-
if s3_uri.startswith("s3://"):
1172-
parsed = urlparse(s3_uri)
1173-
bucket = parsed.netloc
1174-
prefix = parsed.path.lstrip("/")
1175-
else:
1176-
if not bucket_raw:
1177-
raise ValueError(
1178-
"bucket_raw must be provided when s3_uri is not a full S3 URI."
1179-
)
1180-
bucket = bucket_raw
1181-
prefix = s3_uri
1245+
prefix = str(PurePosixPath(prefix)) # normalise path
11821246

1183-
prefix = str(PurePosixPath(prefix)) # normalise path
1247+
matching_files = s3_ls(
1248+
bucket,
1249+
prefix,
1250+
suffix=suffix,
1251+
exclude=exclude,
1252+
s3_client_opts=s3_client_opts,
1253+
)
11841254

1185-
# matching_files = s3_ls(bucket, prefix, suffix=suffix, exclude=exclude)
1186-
matching_files = s3_ls(
1187-
bucket, prefix, suffix=None, exclude=exclude, s3_client_opts=s3_client_opts
1188-
)
1255+
for pattern in path_cfg.filter or []:
1256+
logger.info(f"Filtering files with regex pattern: {pattern}")
1257+
regex = re.compile(pattern)
1258+
matching_files = [f for f in matching_files if regex.search(f)]
1259+
if not matching_files:
1260+
raise ValueError(
1261+
f"No files matching {pattern} under {s3_uri}. Modify regexp filter or path in configuration file. Abort"
1262+
)
11891263

1190-
for pattern in path_cfg.filter or []:
1191-
logger.info(f"Filtering files with regex pattern: {pattern}")
1192-
regex = re.compile(pattern)
1193-
matching_files = [f for f in matching_files if regex.search(f)]
1194-
if matching_files == []:
1195-
raise ValueError(
1196-
f"No files matching {pattern} under {s3_uri}. Modify regexp filter or path in configuration file. Abort"
1197-
)
1264+
logger.info(f"Matched {len(matching_files)} files")
1265+
1266+
return matching_files
1267+
1268+
# ---------------------------------------------------------------------
1269+
# Handle 'parquet' (single Parquet file or Hive-partitioned dataset)
1270+
# ---------------------------------------------------------------------
1271+
elif dataset_type == "parquet":
1272+
# No filters
1273+
return [s3_uri]
11981274

1199-
logger.info(f"Matched {len(matching_files)} files")
1275+
# ---------------------------------------------------------------------
1276+
# Handle 'zarr' (Zarr store)
1277+
# ---------------------------------------------------------------------
1278+
elif dataset_type == "zarr":
1279+
raise ValueError("zarr store as an input dataset is not yet implemented")
1280+
# return [s3_uri]
12001281

1201-
return matching_files
1282+
# Unsupported type
1283+
else:
1284+
raise ValueError(f"Unsupported dataset type: {dataset_type}")
1285+
1286+
1287+
def json_update(base: dict, updates: dict) -> dict:
1288+
"""Recursively update nested dictionaries."""
1289+
for k, v in updates.items():
1290+
if isinstance(v, dict) and isinstance(base.get(k), dict):
1291+
base[k] = json_update(base[k], v)
1292+
else:
1293+
base[k] = v
1294+
return base
12021295

12031296

12041297
def join_s3_uri(base_uri: str, *parts: str) -> str:

aodn_cloud_optimised/config/dataset/diver_photoquadrat_score_qc.json

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,8 @@
55
"run_settings": {
66
"paths": [
77
{
8-
"s3_uri": "s3://data-uplift-public/products/reef_life_survey",
9-
"filter": [
10-
"public_reef_life_survey_2025-11-04T03:14:37\\.parquet$"
11-
],
12-
"year_range": []
8+
"type": "parquet",
9+
"s3_uri": "s3://data-uplift-public/products/reef_life_survey/public_reef_life_survey_2025-11-04T03:14:37.parquet"
1310
}
1411
],
1512
"cluster": {

0 commit comments

Comments
 (0)