3737import importlib .util
3838import json
3939import os
40- import pathlib
4140import uuid
4241from collections import OrderedDict
4342from importlib .resources import files
4443
4544import nbformat
4645import pandas as pd
47- import pyarrow as pa
48- import pyarrow .parquet as pq
4946import s3fs
5047from s3path import PureS3Path
5148from termcolor import colored
@@ -737,19 +734,21 @@ def main():
737734
738735 # Handle S3 path
739736 if args .file .startswith ("s3://" ):
740- fp = args .file
741- s3_path = PureS3Path .from_uri (fp )
742- bucket = s3_path .bucket
743- obj_key = str (s3_path .key )
737+ nc_file = args .file
738+ p = PureS3Path .from_uri (nc_file )
739+ bucket = p .bucket
740+ obj_key = str (p .key )
744741 else :
745742 obj_key = args .file
746743 bucket = args .bucket
747- fp = PureS3Path .from_uri (f"s3://{ args .bucket } " ).joinpath (args .file ).as_uri ()
744+ nc_file = (
745+ PureS3Path .from_uri (f"s3://{ args .bucket } " ).joinpath (args .file ).as_uri ()
746+ )
748747
749748 # Create an empty NetCDF with NaN variables alongside the JSON files. Acts as the source of truth for restoring missing dimensions.
750749 # only useful for Zarr to concatenate NetCDF together with missing var/dim in some NetCDF files
751750 if args .cloud_format == "zarr" :
752- nc_nullify_path = nullify_netcdf_variables (fp , args .dataset_name )
751+ nc_nullify_path = nullify_netcdf_variables (nc_file , args .dataset_name )
753752
754753 # optionals s3fs options
755754 if args .s3fs_opts :
@@ -759,61 +758,43 @@ def main():
759758 anon = False ,
760759 )
761760
762- # Route by file type
763- obj_key_suffix = pathlib .Path (obj_key .lower ()).suffix
764- match obj_key_suffix :
761+ # Generate schema based on input type (NetCDF or CSV)
762+ if obj_key .lower ().endswith (".csv" ):
763+ csv_file = nc_file # TODO: rename
764+
765+ csv_opts = json .loads (args .csv_opts ) if args .csv_opts else {}
766+ with fs .open (csv_file , "rb" ) as f :
767+ df = pd .read_csv (f , ** csv_opts )
768+
769+ dataset_config_schema = {"type" : "object" , "properties" : {}}
770+ for col , dtype in df .dtypes .items ():
771+ if pd .api .types .is_integer_dtype (dtype ):
772+ js_type = "integer"
773+ elif pd .api .types .is_float_dtype (dtype ):
774+ js_type = "number"
775+ elif pd .api .types .is_bool_dtype (dtype ):
776+ js_type = "boolean"
777+ elif pd .api .types .is_object_dtype (dtype ) | pd .api .types .is_string_dtype (
778+ dtype
779+ ):
780+ js_type = "string"
781+ else :
782+ raise NotImplementedError (
783+ f"found dtype that did not fit into configured categories: `{ dtype } `"
784+ )
765785
766- case ".nc" :
786+ dataset_config_schema [ "properties" ][ col ] = { "type" : js_type }
767787
768- # Generate JSON schema from the NetCDF file
769- temp_file_path = generate_json_schema_from_s3_netcdf (
770- fp , cloud_format = args .cloud_format , s3_fs = fs
771- )
772- with open (temp_file_path , "r" ) as file :
773- dataset_config_schema = json .load (file )
774- os .remove (temp_file_path )
775-
776- case ".csv" :
777-
778- # Load the csv using options
779- csv_opts = json .loads (args .csv_opts ) if args .csv_opts else {}
780- with fs .open (fp , "rb" ) as f :
781- df = pd .read_csv (f , ** csv_opts )
782-
783- # Update column types for the dataset config
784- dataset_config_schema = dict ()
785- for col , dtype in df .dtypes .items ():
786- if pd .api .types .is_integer_dtype (dtype ):
787- js_type = "integer"
788- elif pd .api .types .is_float_dtype (dtype ):
789- js_type = "number"
790- elif pd .api .types .is_bool_dtype (dtype ):
791- js_type = "boolean"
792- elif pd .api .types .is_object_dtype (dtype ) | pd .api .types .is_string_dtype (
793- dtype
794- ):
795- js_type = "string"
796- else :
797- raise NotImplementedError (
798- f"found dtype that did not fit into configured categories: `{ dtype } `"
799- )
800-
801- dataset_config_schema [col ] = {"type" : js_type }
802-
803- case ".parquet" :
804-
805- with fs .open (fp , "rb" ) as f :
806- schema = pq .read_schema (f )
807- dataset_config_schema = dict ()
808-
809- for field in schema :
810- dataset_config_schema [field .name ] = {"type" : str (field .type )}
811-
812- # Default: Raise NotImplemented
813- case _:
814- raise NotImplementedError (
815- f"input file type `{ obj_key_suffix } ` not implemented"
816- )
788+ elif obj_key .lower ().endswith (".nc" ):
789+ # Generate JSON schema from the NetCDF file
790+ temp_file_path = generate_json_schema_from_s3_netcdf (
791+ nc_file , cloud_format = args .cloud_format , s3_fs = fs
792+ )
793+ with open (temp_file_path , "r" ) as file :
794+ dataset_config_schema = json .load (file )
795+ os .remove (temp_file_path )
796+ else :
797+ raise NotImplementedError (f"input file type `{ obj_key } ` not implemented" )
817798
818799 dataset_config = {"schema" : dataset_config_schema }
819800 # Define the path to the validation schema file
@@ -854,7 +835,7 @@ def main():
854835 "mode" : f"{ TO_REPLACE_PLACEHOLDER } " ,
855836 "restart_every_path" : False ,
856837 }
857- parent_s3_path = PureS3Path .from_uri (fp ).parent .as_uri ()
838+ parent_s3_path = PureS3Path .from_uri (nc_file ).parent .as_uri ()
858839 dataset_config ["run_settings" ]["paths" ] = [
859840 {"s3_uri" : parent_s3_path , "filter" : [".*\\ .nc" ], "year_range" : []}
860841 ]
@@ -960,7 +941,9 @@ def main():
960941 with open (f"{ module_path } /config/dataset/{ args .dataset_name } .json" , "w" ) as f :
961942 json .dump (dataset_config , f , indent = 2 )
962943
963- create_dataset_script (args .dataset_name , f"{ args .dataset_name } .json" , fp , bucket )
944+ create_dataset_script (
945+ args .dataset_name , f"{ args .dataset_name } .json" , nc_file , bucket
946+ )
964947 update_pyproject_toml (args .dataset_name )
965948
966949 # fill up aws registry with GN3 uuid
0 commit comments