3737import importlib .util
3838import json
3939import os
40+ import pathlib
4041import uuid
4142from collections import OrderedDict
4243from importlib .resources import files
4344
4445import nbformat
4546import pandas as pd
47+ import pyarrow as pa
48+ import pyarrow .parquet as pq
4649import s3fs
4750from s3path import PureS3Path
4851from termcolor import colored
@@ -734,21 +737,19 @@ def main():
734737
735738 # Handle S3 path
736739 if args .file .startswith ("s3://" ):
737- nc_file = args .file
738- p = PureS3Path .from_uri (nc_file )
739- bucket = p .bucket
740- obj_key = str (p .key )
740+ fp = args .file
741+ s3_path = PureS3Path .from_uri (fp )
742+ bucket = s3_path .bucket
743+ obj_key = str (s3_path .key )
741744 else :
742745 obj_key = args .file
743746 bucket = args .bucket
744- nc_file = (
745- PureS3Path .from_uri (f"s3://{ args .bucket } " ).joinpath (args .file ).as_uri ()
746- )
747+ fp = PureS3Path .from_uri (f"s3://{ args .bucket } " ).joinpath (args .file ).as_uri ()
747748
748749 # Create an empty NetCDF with NaN variables alongside the JSON files. Acts as the source of truth for restoring missing dimensions.
749750 # only useful for Zarr to concatenate NetCDF together with missing var/dim in some NetCDF files
750751 if args .cloud_format == "zarr" :
751- nc_nullify_path = nullify_netcdf_variables (nc_file , args .dataset_name )
752+ nc_nullify_path = nullify_netcdf_variables (fp , args .dataset_name )
752753
753754 # optionals s3fs options
754755 if args .s3fs_opts :
@@ -758,43 +759,61 @@ def main():
758759 anon = False ,
759760 )
760761
761- # Generate schema based on input type (NetCDF or CSV)
762- if obj_key .lower ().endswith (".csv" ):
763- csv_file = nc_file # TODO: rename
764-
765- csv_opts = json .loads (args .csv_opts ) if args .csv_opts else {}
766- with fs .open (csv_file , "rb" ) as f :
767- df = pd .read_csv (f , ** csv_opts )
768-
769- dataset_config_schema = {"type" : "object" , "properties" : {}}
770- for col , dtype in df .dtypes .items ():
771- if pd .api .types .is_integer_dtype (dtype ):
772- js_type = "integer"
773- elif pd .api .types .is_float_dtype (dtype ):
774- js_type = "number"
775- elif pd .api .types .is_bool_dtype (dtype ):
776- js_type = "boolean"
777- elif pd .api .types .is_object_dtype (dtype ) | pd .api .types .is_string_dtype (
778- dtype
779- ):
780- js_type = "string"
781- else :
782- raise NotImplementedError (
783- f"found dtype that did not fit into configured categories: `{ dtype } `"
784- )
762+ # Route by file type
763+ obj_key_suffix = pathlib .Path (obj_key .lower ()).suffix
764+ match obj_key_suffix :
785765
786- dataset_config_schema [ "properties" ][ col ] = { "type" : js_type }
766+ case ".nc" :
787767
788- elif obj_key .lower ().endswith (".nc" ):
789- # Generate JSON schema from the NetCDF file
790- temp_file_path = generate_json_schema_from_s3_netcdf (
791- nc_file , cloud_format = args .cloud_format , s3_fs = fs
792- )
793- with open (temp_file_path , "r" ) as file :
794- dataset_config_schema = json .load (file )
795- os .remove (temp_file_path )
796- else :
797- raise NotImplementedError (f"input file type `{ obj_key } ` not implemented" )
768+ # Generate JSON schema from the NetCDF file
769+ temp_file_path = generate_json_schema_from_s3_netcdf (
770+ fp , cloud_format = args .cloud_format , s3_fs = fs
771+ )
772+ with open (temp_file_path , "r" ) as file :
773+ dataset_config_schema = json .load (file )
774+ os .remove (temp_file_path )
775+
776+ case ".csv" :
777+
778+ # Load the csv using options
779+ csv_opts = json .loads (args .csv_opts ) if args .csv_opts else {}
780+ with fs .open (fp , "rb" ) as f :
781+ df = pd .read_csv (f , ** csv_opts )
782+
783+ # Update column types for the dataset config
784+ dataset_config_schema = dict ()
785+ for col , dtype in df .dtypes .items ():
786+ if pd .api .types .is_integer_dtype (dtype ):
787+ js_type = "integer"
788+ elif pd .api .types .is_float_dtype (dtype ):
789+ js_type = "number"
790+ elif pd .api .types .is_bool_dtype (dtype ):
791+ js_type = "boolean"
792+ elif pd .api .types .is_object_dtype (dtype ) | pd .api .types .is_string_dtype (
793+ dtype
794+ ):
795+ js_type = "string"
796+ else :
797+ raise NotImplementedError (
798+ f"found dtype that did not fit into configured categories: `{ dtype } `"
799+ )
800+
801+ dataset_config_schema [col ] = {"type" : js_type }
802+
803+ case ".parquet" :
804+
805+ with fs .open (fp , "rb" ) as f :
806+ schema = pq .read_schema (f )
807+ dataset_config_schema = dict ()
808+
809+ for field in schema :
810+ dataset_config_schema [field .name ] = {"type" : str (field .type )}
811+
812+ # Default: Raise NotImplemented
813+ case _:
814+ raise NotImplementedError (
815+ f"input file type `{ obj_key_suffix } ` not implemented"
816+ )
798817
799818 dataset_config = {"schema" : dataset_config_schema }
800819 # Define the path to the validation schema file
@@ -835,7 +854,7 @@ def main():
835854 "mode" : f"{ TO_REPLACE_PLACEHOLDER } " ,
836855 "restart_every_path" : False ,
837856 }
838- parent_s3_path = PureS3Path .from_uri (nc_file ).parent .as_uri ()
857+ parent_s3_path = PureS3Path .from_uri (fp ).parent .as_uri ()
839858 dataset_config ["run_settings" ]["paths" ] = [
840859 {"s3_uri" : parent_s3_path , "filter" : [".*\\ .nc" ], "year_range" : []}
841860 ]
@@ -941,9 +960,7 @@ def main():
941960 with open (f"{ module_path } /config/dataset/{ args .dataset_name } .json" , "w" ) as f :
942961 json .dump (dataset_config , f , indent = 2 )
943962
944- create_dataset_script (
945- args .dataset_name , f"{ args .dataset_name } .json" , nc_file , bucket
946- )
963+ create_dataset_script (args .dataset_name , f"{ args .dataset_name } .json" , fp , bucket )
947964 update_pyproject_toml (args .dataset_name )
948965
949966 # fill up aws registry with GN3 uuid
0 commit comments