Skip to content

Commit 015b2c7

Browse files
authored
Merge pull request #225 from aodn/FeatNrmn
FeatNrmn
2 parents 7197c33 + 747af7d commit 015b2c7

22 files changed

+36021
-70
lines changed

aodn_cloud_optimised/bin/create_dataset_config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,8 @@ def main():
772772
dataset_config_schema = json.load(file)
773773
os.remove(temp_file_path)
774774

775+
regex_filter = [".*\\.nc$"]
776+
775777
case ".csv":
776778
csv_opts = json.loads(args.csv_opts) if args.csv_opts else {}
777779

@@ -808,6 +810,8 @@ def main():
808810
# dataset_config_schema["properties"][col] = {"type": js_type}
809811
dataset_config_schema[col] = {"type": js_type}
810812

813+
regex_filter = [".*\\.csv$"]
814+
811815
case ".parquet":
812816
with fs.open(fp, "rb") as f:
813817
schema = pq.read_schema(f)
@@ -816,6 +820,8 @@ def main():
816820
for field in schema:
817821
dataset_config_schema[field.name] = {"type": str(field.type)}
818822

823+
regex_filter = [".*\\.parquet$"]
824+
819825
# Default: Raise NotImplemented
820826
case _:
821827
raise NotImplementedError(
@@ -863,7 +869,7 @@ def main():
863869
}
864870
parent_s3_path = PureS3Path.from_uri(fp).parent.as_uri()
865871
dataset_config["run_settings"]["paths"] = [
866-
{"s3_uri": parent_s3_path, "filter": [".*\\.nc"], "year_range": []}
872+
{"s3_uri": parent_s3_path, "filter": regex_filter, "year_range": []}
867873
]
868874

869875
if args.s3fs_opts:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
generic_launcher.py
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
generic_launcher.py
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
generic_launcher.py
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
generic_launcher.py

aodn_cloud_optimised/bin/generic_cloud_optimised_creation.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,9 @@ class ParquetSchemaTransformation(BaseModel):
643643
default=None,
644644
description="Custom functions used to extract metadata from object keys and turn into variables, required if @function: is used in add_variables.",
645645
)
646+
skip_partitioning_validation: bool = Field(
647+
False, description="Set to true to skip required partitioning validation."
648+
)
646649

647650
@field_validator("add_variables")
648651
@classmethod
@@ -719,6 +722,9 @@ def validate_add_variables(cls, value):
719722

720723
@model_validator(mode="after")
721724
def validate_required_patitions(self):
725+
if self.skip_partitioning_validation:
726+
return self
727+
722728
if not self.partitioning:
723729
raise ValueError("'partitioning' key missing")
724730

@@ -727,7 +733,7 @@ def validate_required_patitions(self):
727733
required_partitioning_keys = ["polygon", "timestamp"]
728734
if not all(key in partition_keys for key in required_partitioning_keys):
729735
raise ValueError(
730-
f"Required variables {required_partitioning_keys} must be present in the 'partitioning' key. Only {partition_keys} available"
736+
f"Required variables {required_partitioning_keys} must be present in the 'partitioning' key. Only {partition_keys} available.\n If you think those partitions shouldn't exist, set '\"skip_partitioning_validation\" : true' in the schema_transformation configuration"
731737
)
732738

733739
return self
@@ -1241,14 +1247,21 @@ def main():
12411247
description="Run cloud-optimised creation using config."
12421248
)
12431249
parser.add_argument(
1244-
"--config", required=False, help="JSON filename in config/dataset/"
1250+
"-c", "--config", required=False, help="JSON filename in config/dataset/"
12451251
)
12461252
parser.add_argument(
1253+
"-o",
12471254
"--json-overwrite",
12481255
type=str,
12491256
help='JSON string to override config fields. Example: \'{"run_settings": {"cluster": {"mode": null}, "raise_error": true}}\' ',
12501257
)
12511258

1259+
parser.add_argument(
1260+
"-t",
1261+
"--test",
1262+
action="store_true",
1263+
help="Use integration testing bucket instead of the default optimised bucket.",
1264+
)
12521265
args = parser.parse_args()
12531266

12541267
try:
@@ -1289,6 +1302,15 @@ def main():
12891302
or load_variable_from_config("ROOT_PREFIX_CLOUD_OPTIMISED_PATH")
12901303
)
12911304

1305+
# Override for test mode
1306+
if args.test:
1307+
bucket_optimised = load_variable_from_config(
1308+
"BUCKET_INTEGRATION_TESTING_OPTIMISED_DEFAULT"
1309+
)
1310+
root_prefix = load_variable_from_config(
1311+
"ROOT_PREFIX_CLOUD_OPTIMISED_INTEGRATION_TESTING_PATH"
1312+
)
1313+
12921314
s3_fs_common_opts = config.run_settings.s3_fs_common_opts
12931315
s3_client_opts = boto3_from_opts_dict(s3_fs_common_opts)
12941316

aodn_cloud_optimised/config/common.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44
"ROOT_PREFIX_CLOUD_OPTIMISED_PATH": "",
55
"BUCKET_INTEGRATION_TESTING_RAW_DEFAULT": "imos-data",
66
"BUCKET_INTEGRATION_TESTING_OPTIMISED_DEFAULT": "imos-data-lab-optimised",
7-
"ROOT_PREFIX_CLOUD_OPTIMISED_INTEGRATION_TESTING_PATH": "cloud_optimised/integration_testing"
7+
"ROOT_PREFIX_CLOUD_OPTIMISED_INTEGRATION_TESTING_PATH": ""
88
}
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
{
2+
"dataset_name": "diver_cryptobenthic_fish_abundance_qc",
3+
"logger_name": "diver_cryptobenthic_fish_abundance_qc",
4+
"cloud_optimised_format": "parquet",
5+
"run_settings": {
6+
"paths": [
7+
{
8+
"s3_uri": "s3://imos-data/IMOS/NRMN",
9+
"filter": [
10+
"ep_m2_cryptic_fish_public_data\\.csv"
11+
],
12+
"year_range": []
13+
}
14+
],
15+
"cluster": {
16+
"mode": "local",
17+
"restart_every_path": false
18+
},
19+
"clear_existing_data": true,
20+
"raise_error": false,
21+
"coiled_cluster_options": {
22+
"n_workers": [
23+
1,
24+
20
25+
],
26+
"scheduler_vm_types": "m7i-flex.large",
27+
"worker_vm_types": "m7i-flex.large",
28+
"allow_ingress_from": "me",
29+
"compute_purchase_option": "spot_with_fallback",
30+
"worker_options": {
31+
"nthreads": 4,
32+
"memory_limit": "8GB"
33+
}
34+
},
35+
"batch_size": 1,
36+
"force_previous_parquet_deletion": false
37+
},
38+
"metadata_uuid": "9efa25cd-4da4-47b5-9385-45e3cbd11705",
39+
"schema": {
40+
"survey_id": {
41+
"type": "int32"
42+
},
43+
"country": {
44+
"type": "string"
45+
},
46+
"area": {
47+
"type": "string"
48+
},
49+
"ecoregion": {
50+
"type": "string"
51+
},
52+
"realm": {
53+
"type": "string"
54+
},
55+
"location": {
56+
"type": "string"
57+
},
58+
"site_code": {
59+
"type": "string"
60+
},
61+
"site_name": {
62+
"type": "string"
63+
},
64+
"latitude": {
65+
"type": "float32"
66+
},
67+
"longitude": {
68+
"type": "float32"
69+
},
70+
"survey_date": {
71+
"type": "string"
72+
},
73+
"depth": {
74+
"type": "float32"
75+
},
76+
"program": {
77+
"type": "string"
78+
},
79+
"visibility": {
80+
"type": "float32"
81+
},
82+
"hour": {
83+
"type": "string"
84+
},
85+
"survey_latitude": {
86+
"type": "float32"
87+
},
88+
"survey_longitude": {
89+
"type": "float32"
90+
},
91+
"method": {
92+
"type": "int32"
93+
},
94+
"block": {
95+
"type": "int32"
96+
},
97+
"phylum": {
98+
"type": "string"
99+
},
100+
"class_": {
101+
"type": "string"
102+
},
103+
"order": {
104+
"type": "string"
105+
},
106+
"family": {
107+
"type": "string"
108+
},
109+
"species_name": {
110+
"type": "string"
111+
},
112+
"reporting_name": {
113+
"type": "string"
114+
},
115+
"size_class": {
116+
"type": "float32"
117+
},
118+
"total": {
119+
"type": "int32"
120+
},
121+
"biomass": {
122+
"type": "float32"
123+
},
124+
"geom": {
125+
"type": "string"
126+
}
127+
},
128+
"aws_opendata_registry": {
129+
"Name": "IMOS - National Reef Monitoring Network Sub-Facility - Global cryptobenthic fish abundance",
130+
"Description": "The National Reef Monitoring Network brings together shallow reef surveys conducted around Australia into a centralised database. The IMOS National Reef Monitoring Network sub-Facility collates, cleans, stores and makes this data rapidly available from contributors including: Reef Life Survey, Parks Australia, Department of Biodiversity, Conservation and Attractions (Western Australia), Department of Environment, Water and Natural Resources (South Australia), Department of Primary Industries (New South Wales), Tasmanian Parks and Wildlife Service and Parks Victoria.\n\nThe data provided by the National Reef Monitoring Network contributes to establishing and supporting national marine baselines, and assisting with the management of Commonwealth and State marine reserves. \n\nReef Life Survey (RLS) and the Australian Temperate Reef Network (ATRC) aims to improve biodiversity conservation and the sustainable management of marine resources by coordinating surveys of rocky and coral reefs using scientific methods, with the ultimate goal to improve coastal stewardship. Our activities depend on the skills of marine scientists, experienced and motivated recreational SCUBA divers, partnerships with management agencies and university researchers, and active input from the ATRC partners and RLS Advisory Committee.\n\nRLS and ATRC data are freely available to the public for non-profit purposes, so not only managers, but also groups such as local dive clubs or schools may use these data to look at changes over time in their own local reefs. By making data freely available and through public outputs, RLS and ATRC aims to raise broader community awareness of the status of Australia?s marine biodiversity and associated conservation issues.\n\nThis dataset contains records of cryptobenthic fishes collected by RLS and ATRC divers and partners along 50m transects on shallow rocky and coral reefs using standard methods. Abundance information is available for all species recorded within quantitative survey limits (50 x 1 m swathes either side of the transect line, each distinguished as a 'Block'), with divers searching the reef surface (including cracks) carefully for hidden fishes. These observations are recorded concurrently with the macroinvertebrate observations and together make up the 'Method 2' component of the surveys. For this method, typically one 'Block' is completed per 50 m transect for the program ATRC and 2 blocks are completed for RLS ? although exceptions to this rule exist.\n\nThis dataset supersedes the RLS specific \"Reef Life Survey (RLS): Cryptic Fish\" collection that was available at https://catalogue-rls.imas.utas.edu.au/geonetwork/srv/en/metadata.show?uuid=6a56db3f-d1b2-438d-98c6-bd7dd540a4d5 (provision of data was stopped in June 2021).",
131+
"Documentation": "https://catalogue-imos.aodn.org.au/geonetwork/srv/eng/catalog.search#/metadata/9efa25cd-4da4-47b5-9385-45e3cbd11705",
132+
"Contact": "info@aodn.org.au",
133+
"ManagedBy": "AODN",
134+
"UpdateFrequency": "As Needed",
135+
"Tags": [
136+
"FILL UP MANUALLY - CHECK DOCUMENTATION"
137+
],
138+
"License": "http://creativecommons.org/licenses/by/4.0/",
139+
"Resources": [
140+
{
141+
"Description": "Cloud Optimised AODN dataset of IMOS - National Reef Monitoring Network Sub-Facility - Global cryptobenthic fish abundance",
142+
"ARN": "arn:aws:s3:::aodn-cloud-optimised/diver_cryptobenthic_fish_abundance_qc.parquet",
143+
"Region": "ap-southeast-2",
144+
"Type": "S3 Bucket"
145+
}
146+
],
147+
"DataAtWork": {
148+
"Tutorials": [
149+
{
150+
"Title": "Accessing IMOS - National Reef Monitoring Network Sub-Facility - Global cryptobenthic fish abundance",
151+
"URL": "https://github.com/aodn/aodn_cloud_optimised/blob/main/notebooks/diver_cryptobenthic_fish_abundance_qc.ipynb",
152+
"NotebookURL": "https://githubtocolab.com/aodn/aodn_cloud_optimised/blob/main/notebooks/diver_cryptobenthic_fish_abundance_qc.ipynb",
153+
"AuthorName": "Laurent Besnard",
154+
"AuthorURL": "https://github.com/aodn/aodn_cloud_optimised"
155+
},
156+
{
157+
"Title": "Accessing and search for any AODN dataset",
158+
"URL": "https://github.com/aodn/aodn_cloud_optimised/blob/main/notebooks/GetAodnData.ipynb",
159+
"NotebookURL": "https://githubtocolab.com/aodn/aodn_cloud_optimised/blob/main/notebooks/GetAodnData.ipynb",
160+
"AuthorName": "Laurent Besnard",
161+
"AuthorURL": "https://github.com/aodn/aodn_cloud_optimised"
162+
}
163+
]
164+
},
165+
"Citation": "IMOS [year-of-data-download], [Title], [data-access-URL], accessed [date-of-access]"
166+
},
167+
"csv_config": {
168+
"polars_read_csv_config": {
169+
"separator": ",",
170+
"has_header": true,
171+
"null_values": [
172+
"N/A",
173+
"NaN"
174+
],
175+
"try_parse_dates": false,
176+
"infer_schema_length": 1000,
177+
"encoding": "utf-8"
178+
}
179+
},
180+
"schema_transformation": {
181+
"drop_variables": [],
182+
"add_variables": {
183+
"filename": {
184+
"source": "@filename",
185+
"schema": {
186+
"type": "string",
187+
"units": "1",
188+
"long_name": "Filename of the source file"
189+
}
190+
},
191+
"timestamp": {
192+
"source": "@partitioning:time_extent",
193+
"schema": {
194+
"type": "int64",
195+
"units": "1",
196+
"long_name": "Partition timestamp"
197+
}
198+
},
199+
"polygon": {
200+
"source": "@partitioning:spatial_extent",
201+
"schema": {
202+
"type": "string",
203+
"units": "1",
204+
"long_name": "Spatial partition polygon"
205+
}
206+
},
207+
"TIME": {
208+
"source": "@function:time_creation",
209+
"schema": {
210+
"type": "timestamp[ns]",
211+
"units": "days since 1970-01-01T00:00:00Z",
212+
"_FillValue": "",
213+
"long_name": "Derived timestamp"
214+
}
215+
}
216+
},
217+
"functions": {
218+
"time_creation": {
219+
"extract_method": "from_variables",
220+
"method": {
221+
"creation_code": "def time_creation_from_variables(df):\n import pandas as pd\n date_col = df.get('survey_date')\n hour_col = df.get('hour')\n\n # Fill missing hour with 00:00:00\n if hour_col is None:\n hour_col = pd.Series(['00:00:00']*len(df), index=df.index)\n else:\n hour_col = hour_col.fillna('00:00:00')\n\n # Combine date and hour strings\n dt_str = date_col.astype(str) + ' ' + hour_col.astype(str)\n result = pd.to_datetime(dt_str, errors='coerce', format='%Y-%m-%d %H:%M:%S')\n # fallback to just date if conversion failed\n mask = result.isna()\n if mask.any():\n result.loc[mask] = pd.to_datetime(date_col[mask], errors='coerce')\n return result"
222+
}
223+
}
224+
},
225+
"partitioning": [
226+
{
227+
"source_variable": "timestamp",
228+
"type": "time_extent",
229+
"time_extent": {
230+
"time_varname": "TIME",
231+
"partition_period": "Y"
232+
}
233+
},
234+
{
235+
"source_variable": "polygon",
236+
"type": "spatial_extent",
237+
"spatial_extent": {
238+
"lat_varname": "latitude",
239+
"lon_varname": "longitude",
240+
"spatial_resolution": 10
241+
}
242+
}
243+
],
244+
"global_attributes": {
245+
"delete": [
246+
"geospatial_lat_max",
247+
"geospatial_lat_min",
248+
"geospatial_lon_max",
249+
"geospatial_lon_min",
250+
"date_created"
251+
],
252+
"set": {
253+
"title": "IMOS - National Reef Monitoring Network Sub-Facility - Global off-transect species observations"
254+
}
255+
}
256+
}
257+
}

0 commit comments

Comments
 (0)