diff --git a/utils/build_datasets.py b/utils/build_datasets.py index 0fb6052c..a7d36700 100644 --- a/utils/build_datasets.py +++ b/utils/build_datasets.py @@ -4,12 +4,14 @@ information provided in a Data Sharing Plan CSV or a Data Sharing Plan table Synapse Id. Usage: -python build_datasets.py -d [DataDSP filepath] -n [Name for DSP CSV output] +python build_datasets.py -d [DataDSP filepath] -n [Name for DSP CSV output] -f -a -c [Default version number] author: orion.banks """ import argparse +import datetime +from dateutil.parser import isoparse import os import pandas as pd import random @@ -36,6 +38,27 @@ def get_args(): required=False, default="updated_dsp", ) + parser.add_argument( + "-f", + action="store_true", + help="Boolean. If provided, files will be filtered based on the dataset planned upload date.", + required=False, + default=None + ) + parser.add_argument( + "-a", + action="store_true", + help="Boolean. If provided, files will be added if created after the dataset planned upload date.", + required=False, + default=False + ) + parser.add_argument( + "-c", + type=int, + help="An integer representing the file version number to select. If mixed versions are expected, a value of 0 should be provided. Default: 1", + required=False, + default=1 + ) return parser.parse_args() @@ -48,7 +71,7 @@ def get_table(syn, source_id: str) -> pd.DataFrame: return table -def filter_files_in_folder(syn, scope: str, formats: list[str], folder_or_files: str) -> list: +def filter_files_in_folder(syn, scope: str, formats: list[str], folder_or_files: str, cutoff_date: str, after_date: bool, check_version: bool, default_version: int) -> list: """Capture all files in provided scope and select files that match a list of formats, return list of dataset items""" @@ -60,16 +83,51 @@ def filter_files_in_folder(syn, scope: str, formats: list[str], folder_or_files: elif folder_or_files == "folder": file_to_add = [entity_id for f, entity_id in filename] # select all files in folder for entity_id in file_to_add: + if check_version or cutoff_date is not None: + file_info = syn.get(entity_id, downloadFile=False) + current_version, created_on_date = file_info.versionLabel, file_info.createdOn + if cutoff_date is not None: + add_file = filter_files_by_date(created_on_date, cutoff_date, after_date) + if add_file is False: + continue dataset_items.append({ "entityId": entity_id, - "versionNumber": syn.get(entity_id, downloadFile=False).versionLabel + "versionNumber": current_version if check_version else default_version }) dataset_len = len(dataset_items) print(f"--> {dataset_len} files found...") return dataset_items +def filter_files_by_date(created_on_iso: str, set_date: str, after_date: bool) -> bool: + """Check if file should be included based createdOn date, using a provided cutoff date. + Cutoff date is expected in YYYY-MM-DD format and will automatically be + set to 00:00:00 for HH:MM:SS""" + + set_date = set_date + "-0-0-0" + + parsed_date = [int(t) for t in set_date.split("-")] + + converted_datetime = datetime.datetime( + parsed_date[0], + parsed_date[1], + parsed_date[2], + parsed_date[3], + parsed_date[4], + parsed_date[5], + tzinfo=datetime.timezone.utc + ) + + timestamp = converted_datetime.timestamp() -def create_dataset_entity(syn, name: str, grant: str, multi_dataset: bool, scope: list) -> tuple[Dataset, str]: + created_on_timestamp = isoparse(created_on_iso).timestamp() + if after_date: + include_in_list = True if created_on_timestamp > timestamp else False # if file was created after the cutoff date + else: + include_in_list = True if created_on_timestamp < timestamp else False # if file was created before the cutoff date + + return include_in_list + +def create_dataset_entity(syn, name: str, grant: str, multi_dataset: bool, scope: list) -> Dataset: """Create an empty Synapse Dataset using the Project associated with the applicable grant number as parent. Return the Dataset object.""" @@ -103,11 +161,12 @@ def main(): args = get_args() - dsp, new_name = args.d, args.n + dsp, new_name, filter_by_date, after_date, default_version = args.d, args.n, args.f, args.a, args.c update_dsp_sheet = None create_dataset = False multi_dataset = False + check_version = True if default_version == 0 else False file_max = 5000 # maximum number of files per Dataset; set to 5000 to avoid web page latency issues if os.path.exists(dsp): @@ -132,6 +191,7 @@ def main(): dataset_name = row["DSP Dataset Name"] formats = re.split(", |,", row["DSP Dataset File Formats"]) level = row["DSP Dataset Level"] + cutoff_date = row["DSP Planned Upload Date"] if filter_by_date is not None else None if level in ["Metadata", "Auxiliary", "Not Applicable"]: print(f"Skipping Dataset {dataset_name} of type {level}") continue # move to next table entry if not data files @@ -140,21 +200,25 @@ def main(): dataset_id_list = [] file_scope_list = [] dataset_name_list = [] - - if dataset_id: # check if a Dataset entity was previously recorded - print(f"--> Files will be added to Dataset {dataset_id}") - else: - create_dataset = True - update_dsp_sheet = True if formats: # only filter files if formats were specified print(f"--> Filtering files from {scope_id}") + print(f"--> Only files of format {formats} will be included") folder_or_files = "files" # filter files by extension/format else: folder_or_files = "folder" # whole folder should be added, don't filter files - scope_files = filter_files_in_folder(syn, scope_id, formats, folder_or_files) - print(f"--> {scope_id} files acquired!\n {len(scope_files)} files will be added to the Dataset.") + if cutoff_date: + print(f"--> Filtering files based on cutoff date: {cutoff_date}") + print(" ".join(["--> Files created", "after" if after_date is True else "before", "cutoff date will be added to dataset"])) + + if check_version: + print("--> Current versions of files will be added to dataset") + else: + print(f"--> Only version {default_version} files will be added to dataset.\n--> If mixed file versions are expected, pass '-c 0' at runtime.") + + scope_files = filter_files_in_folder(syn, scope_id, formats, folder_or_files, cutoff_date, after_date, check_version, default_version) + print(f"--> Scope processing complete!") if len(scope_files) > file_max: new_dataset_count = (len(scope_files) // file_max) @@ -163,12 +227,11 @@ def main(): update_dsp_sheet = True create_dataset = True print( - f"--> File count exceeds file max.\n--> Creating {dataset_total} groups for files from {scope_id}" + f"--> File count exceeds file max (n={file_max}). {new_dataset_count} Datasets will be created.\n--> {len(scope_files)} files from {scope_id} will be added to a total of {dataset_total} Datasets." ) file_scope_list = chunk_files_for_dataset(scope_files, file_max, new_dataset_count) else: - multi_dataset = False file_scope_list = [scope_files] # single dataset, no chunking needed if dataset_id: @@ -177,13 +240,16 @@ def main(): dataset_name_list.append(dataset.name) dataset.add_items(dataset_items=file_scope_list[0], force=True) syn.store(dataset) - print(f"--> Files added to existing Dataset {dataset.id}") + print(f"--> {len(file_scope_list[0])} files added to existing Dataset {dataset.id}") file_scope_list = file_scope_list[1:] # remove first item, already added + else: + create_dataset = True + update_dsp_sheet = True if create_dataset: for scope in file_scope_list: dataset = create_dataset_entity(syn, dataset_name, grant_id, multi_dataset, scope) - print(f"--> New Dataset created and populated with files!") + print(f"--> {len(scope)} files added to new Dataset {dataset.id}") dataset_id_list.append(dataset.id) dataset_name_list.append(dataset.name) @@ -191,13 +257,14 @@ def main(): dataset_tuples = zip(dataset_id_list, dataset_name_list) - for dataset_id, name in dataset_tuples: - if update_dsp_sheet is not None: + if update_dsp_sheet is not None: + print(f"Adding information for {new_dataset_count} Datasets ") + for populated_dataset_id, name in dataset_tuples: temp_df = dsp_df.copy() temp_df.iloc[[_]] = row - temp_df.at[_, "DatasetView Key"] = dataset_id + temp_df.at[_, "DatasetView Key"] = populated_dataset_id temp_df.at[_, "DSP Dataset Name"] = name - updated_df = pd.concat([updated_df, temp_df], ignore_index=True) + updated_df = pd.concat([updated_df, temp_df], ignore_index=True) updated_df.drop_duplicates(subset=["DatasetView Key"], keep="last", inplace=True) else: diff --git a/utils/trim_datasets.py b/utils/trim_datasets.py new file mode 100644 index 00000000..6dd591ce --- /dev/null +++ b/utils/trim_datasets.py @@ -0,0 +1,83 @@ +"""trim_datasets.py + +This script will remove files from datasets based on Synapse Ids. +Takes a CSV with two columns as input: "datasets", "files" +Iterates through the datasets, checks for Synapse Ids in files, and removes them if present + +Usage: +python trim_datasets.py -d [trim config filepath] + +author: orion.banks +""" + +import argparse +import os +import pandas as pd +import synapseclient +from synapseclient import Dataset + + +def get_args(): + """Set up command-line interface and get arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "-d", + type=str, + help="Path or Table Synapse Id associated with a dataset trim config.", + required=True, + default=None, + ) + return parser.parse_args() + + +def get_table(syn, source_id: str) -> pd.DataFrame: + """Collect a Synapse table entity and return as a Dataframe.""" + + query = f"SELECT * FROM {source_id}" + table = syn.tableQuery(query).asDataFrame().fillna("") + + return table + + +def remove_files_from_dataset(syn, dataset: str, files: list[str]) -> tuple[str, list]: + """Get files in dataset and remove if in input list of file Synapse IDs""" + + dataset_entity = syn.get(dataset, downloadFile=False) + dataset_files = pd.DataFrame(dataset_entity.properties.datasetItems) + files_to_remove = [file for file in files if file in dataset_files["entityId"].to_list()] + for file in files_to_remove: + dataset_entity.remove_item(file) + syn.store(dataset_entity) + + return dataset_entity.id, files_to_remove + +def main(): + + syn = synapseclient.login() + + args = get_args() + + config = args.d + + if os.path.exists(config): + config_df = pd.read_csv(config, keep_default_na=False, header=0) + print("\nDataset trimming config read successfully!") + elif "syn" in config: + config_df = get_table(syn, config) + print(f"Data trimming config acquired from Synapse table {config}!") + else: + print( + f"❗❗❗ {config} is not a valid trimming config identifier. Please check your inputs and try again." + ) + exit() + + if "datasets" and "files" in config_df.columns: + dataset_list = [dataset for dataset in config_df["datasets"].to_list() if dataset] + file_list = config_df["files"].to_list() + for dataset in dataset_list: + if dataset: + dataset_id, removed_files = remove_files_from_dataset(syn, dataset, file_list) + print(f"{len(removed_files)} removed from Dataset {dataset_id}") + +if __name__ == "__main__": + main()