-
Notifications
You must be signed in to change notification settings - Fork 3
Description
Current Parquet processing
The first parquet dataset is generated directly from the input NetCDF files.
For example, an input NetCDF might be:
ct2-9916-04_prof.nc
This file is then split (“chunked”) into many parquet files, each parquet filename embedding the original NetCDF filename.
Example parquet outputs (paths with hive partitioning):
aodn-cloud-optimised/.../time_extent=.../polygon=polygonA_WKB/ct2-9916-04_prof.nc-0.parquet
aodn-cloud-optimised/.../time_extent=.../polygon=polygonB_WKB/ct2-9916-04_prof.nc-0.parquet
aodn-cloud-optimised/.../time_extent=.../polygon=polygonC_WKB/ct2-9916-04_prof.nc-0.parquet
Diagram of the parquet hive partition structure
flowchart TD
%% Main Parquet Hive Layout
subgraph MainParquet["🗄️ Main Parquet Dataset (Hive Partitioned)"]
direction TB
A["ct2-9916-**04**_prof.nc"] --> polygon1["polygon=010300000001000000050000000000000000206240"]
A --> polygon2["polygon=010300000001000000050000000000000000207240"]
B["ct2-9916-**05**_prof.nc"] --> polygon1
B --> polygon2
polygon1 --> parquetFiles1["ct2-9916-**04**_prof.nc-0.parquet"]
polygon1 --> parquetFiles2["ct2-9916-**05**_prof.nc-0.parquet"]
polygon2 --> parquetFiles3["ct2-9916-**04**_prof.nc-0.parquet"]
polygon2 --> parquetFiles4["ct2-9916-**05**_prof.nc-0.parquet"]
end
style MainParquet fill:#f3f3f3,stroke:#333,stroke-width:2px
style polygon1 fill:#ffe4b5,stroke:#333,stroke-width:1px
style polygon2 fill:#ffe4b5,stroke:#333,stroke-width:1px
style parquetFiles1 fill:#fff,stroke:#333,stroke-width:1px
style parquetFiles2 fill:#fff,stroke:#333,stroke-width:1px
style parquetFiles3 fill:#fff,stroke:#333,stroke-width:1px
style parquetFiles4 fill:#fff,stroke:#333,stroke-width:1px
Code
aodn_cloud_optimised/aodn_cloud_optimised/lib/GenericParquetHandler.py
Lines 1201 to 1212 in 45a9132
| pq.write_to_dataset( | |
| pdf, | |
| root_path=self.cloud_optimised_output_path, | |
| filesystem=self.s3_fs_output, | |
| existing_data_behavior="overwrite_or_ignore", | |
| row_group_size=20000, | |
| partition_cols=partition_keys, | |
| use_threads=True, | |
| metadata_collector=metadata_collector, | |
| basename_template=filename | |
| + "-{i}.parquet", # this is essential for the overwriting part | |
| ) |
{i} is a placeholder automatically replaced by PyArrow with an incrementing number for each file created when the dataset is split into multiple Parquet files.
This design allows very fast and deterministic but costly updates of the parquet dataset: if the NetCDF changes, we simply locate the parquet files containing ct2-9916-04_prof.nc in their filename and overwrite only those.
This is achieved in this code:
aodn_cloud_optimised/aodn_cloud_optimised/lib/GenericParquetHandler.py
Lines 1336 to 1352 in 45a9132
| def delete_existing_matching_parquet(self, filename) -> None: | |
| """ | |
| Delete unmatched Parquet files. | |
| In scenarios where we reprocess files with similar filenames but potentially different content, | |
| which affects partition values, we may encounter a situation where the old Parquet files are | |
| not overwritten because they don't match the new ones. Although this scenario is unlikely, it | |
| is not impossible. | |
| The challenge arises when we need to list all existing Parquet objects on S3, which could | |
| take a significant amount of time (e.g., 15s+) and could become problematic if there are | |
| already a large number of objects (e.g., 50,000+). In such cases, caution should be exercised, | |
| and batch processing strategies may need to be implemented. | |
| Returns: | |
| None | |
| """ |
This is clean and easy to maintain, but the more objects we have, the longer it takes to find the objects to overwrite. This is optional per dataset and configured here for example:
Line 552 in 45a9132
| "force_previous_parquet_deletion": true |
However:
Problem: when NetCDF inputs are small (~KB), this creates many small parquet files, which slows down data queries.
Why we don’t mix multiple NetCDFs into a single parquet file:
- If we mix them, updating becomes extremely hard.
- We’d need to match rows based on a new
file_idvariable based on the filename. - That’s slow unless
file_idbecomes a partition key. - But if we partition by
file_id, we again end up with tiny files → slow web responses anyway.
Goal: improve web-time response for data queries
Create a second parquet dataset, derived from the first one, suitable for webtime repsonses.
- Input is not the NetCDF files anymore.
- Input is the up to date main parquet dataset and processed by the CO library
- Output parquet files are much larger, each containing data from many original NetCDFs at once.
This gives us:
-
Main parquet:
potentially small files, easy to update, clean lineage per NetCDF -
Webtime parquet:
large files, fast to query, but not updatable
Limitations / operational issues
-
The webtime parquet dataset cannot be generated while the main parquet is being updated/modified.
If the main parquet dataset is updated every hour, this could be a challenge. -
We cannot store the webtime dataset in the aodn-cloud-optimised OpenData bucket.
AWS OpenData policy: only one version per dataset. -
The webtime dataset must be fully recreated multiple times a day.
Because individual webtime parquet files contain many NetCDF sources, we lose the ability to update subsets.For exapmle, Current rebuild time of the full wave_buoy_nrt collection via Coiled takes ~6 minutes and costs $0.04
(3 min infra deployment, 3 min processing). -
Between rebuilds, the previous webtime dataset is deleted and unavailable.
So we need a mechanism to:- host the old webtime dataset temporarily
- rebuild the new one in parallel
- atomically switch to the new dataset when ready
Workflow Diagram
flowchart TD
%% NetCDF -> Authoritative Parquet
A@{ shape: procs, label: "📂 Input NetCDF file:\n**ct2-9916-04_prof.nc**\n**ct2-9916-06_prof.nc**\n**...**"}
-- **Flow 1**: ⚙️ Cloud Optimised Library --> B@{ shape: cyl, label: "🗄️ **Main** Parquet dataset"}
-.-> BB@{ shape: cloud, label: "☁️ Located on OpenData bucket"}
B --> CC@{ shape: procs, label: "📄 Many parquet files containing input filename:\n**aodn-cloud-optimised/.../ct2-9916-04_prof.nc-0.parquet**"}
CC --> DD[✅ Easy to update]
CC --> EE@{label: "⚠️ Can be slow to query if input NetCDF are very small", shape: trap-b}
%% Build Webtime Parquet
B == **Flow 2**: ⚙️ Cloud Optimised Library ==> E@{ shape: cyl, label: "🗄️ new **webtime** Parquet dataset derived from main parquet"}
E -.-> EEE@{shape: cloud, label: "☁️ Located on new webresponse bucket"}
E --> F@{shape: rectangle, label: "📊 Webtime parquet dataset:\nlarge files, fast queries, not updatable"}
%% Constraints subgraph
subgraph Constraints["⚠️ **Constraints**"]
direction TB
G@{ label: "⏱️ Requires full rebuild for every data update", shape: trap-b }
G --> GG[❓ How to trigger update? cron? event trigger?]
H@{ label: "📜 AWS OpenData Policy:\nCannot store multiple versions of a dataset", shape: trap-b }
I@{ label: "🔄 Needs switch-over mechanism during rebuild", shape: trap-b }
end
E -.-> Constraints
%% Styles
style DD fill:green, stroke:#333,stroke-width:1px
style EE fill:orange, stroke:#333,stroke-width:1px
style F fill:green, stroke:#333,stroke-width:1px
style G fill:orange, stroke:#333,stroke-width:1px
style H fill:orange, stroke:#333,stroke-width:1px
style I fill:orange, stroke:#333,stroke-width:1px