Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 88 additions & 21 deletions utils/build_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
information provided in a Data Sharing Plan CSV or a Data Sharing Plan table Synapse Id.

Usage:
python build_datasets.py -d [DataDSP filepath] -n [Name for DSP CSV output]
python build_datasets.py -d [DataDSP filepath] -n [Name for DSP CSV output] -f -a -c [Default version number]

author: orion.banks
"""

import argparse
import datetime
from dateutil.parser import isoparse
import os
import pandas as pd
import random
Expand All @@ -36,6 +38,27 @@ def get_args():
required=False,
default="updated_dsp",
)
parser.add_argument(
"-f",
action="store_true",
help="Boolean. If provided, files will be filtered based on the dataset planned upload date.",
required=False,
default=None
)
parser.add_argument(
"-a",
action="store_true",
help="Boolean. If provided, files will be added if created after the dataset planned upload date.",
required=False,
default=False
)
parser.add_argument(
"-c",
type=int,
help="An integer representing the file version number to select. If mixed versions are expected, a value of 0 should be provided. Default: 1",
required=False,
default=1
)
return parser.parse_args()


Expand All @@ -48,7 +71,7 @@ def get_table(syn, source_id: str) -> pd.DataFrame:
return table


def filter_files_in_folder(syn, scope: str, formats: list[str], folder_or_files: str) -> list:
def filter_files_in_folder(syn, scope: str, formats: list[str], folder_or_files: str, cutoff_date: str, after_date: bool, check_version: bool, default_version: int) -> list:
"""Capture all files in provided scope and select files that match a list of formats,
return list of dataset items"""

Expand All @@ -60,16 +83,51 @@ def filter_files_in_folder(syn, scope: str, formats: list[str], folder_or_files:
elif folder_or_files == "folder":
file_to_add = [entity_id for f, entity_id in filename] # select all files in folder
for entity_id in file_to_add:
if check_version or cutoff_date is not None:
file_info = syn.get(entity_id, downloadFile=False)
current_version, created_on_date = file_info.versionLabel, file_info.createdOn
if cutoff_date is not None:
add_file = filter_files_by_date(created_on_date, cutoff_date, after_date)
if add_file is False:
continue
dataset_items.append({
"entityId": entity_id,
"versionNumber": syn.get(entity_id, downloadFile=False).versionLabel
"versionNumber": current_version if check_version else default_version
})
dataset_len = len(dataset_items)
print(f"--> {dataset_len} files found...")
return dataset_items

def filter_files_by_date(created_on_iso: str, set_date: str, after_date: bool) -> bool:
"""Check if file should be included based createdOn date, using a provided cutoff date.
Cutoff date is expected in YYYY-MM-DD format and will automatically be
set to 00:00:00 for HH:MM:SS"""

set_date = set_date + "-0-0-0"

parsed_date = [int(t) for t in set_date.split("-")]

converted_datetime = datetime.datetime(
parsed_date[0],
parsed_date[1],
parsed_date[2],
parsed_date[3],
parsed_date[4],
parsed_date[5],
tzinfo=datetime.timezone.utc
)

timestamp = converted_datetime.timestamp()

def create_dataset_entity(syn, name: str, grant: str, multi_dataset: bool, scope: list) -> tuple[Dataset, str]:
created_on_timestamp = isoparse(created_on_iso).timestamp()
if after_date:
include_in_list = True if created_on_timestamp > timestamp else False # if file was created after the cutoff date
else:
include_in_list = True if created_on_timestamp < timestamp else False # if file was created before the cutoff date

return include_in_list

def create_dataset_entity(syn, name: str, grant: str, multi_dataset: bool, scope: list) -> Dataset:
"""Create an empty Synapse Dataset using the
Project associated with the applicable grant number as parent.
Return the Dataset object."""
Expand Down Expand Up @@ -103,11 +161,12 @@ def main():

args = get_args()

dsp, new_name = args.d, args.n
dsp, new_name, filter_by_date, after_date, default_version = args.d, args.n, args.f, args.a, args.c

update_dsp_sheet = None
create_dataset = False
multi_dataset = False
check_version = True if default_version == 0 else False
file_max = 5000 # maximum number of files per Dataset; set to 5000 to avoid web page latency issues

if os.path.exists(dsp):
Expand All @@ -132,6 +191,7 @@ def main():
dataset_name = row["DSP Dataset Name"]
formats = re.split(", |,", row["DSP Dataset File Formats"])
level = row["DSP Dataset Level"]
cutoff_date = row["DSP Planned Upload Date"] if filter_by_date is not None else None
if level in ["Metadata", "Auxiliary", "Not Applicable"]:
print(f"Skipping Dataset {dataset_name} of type {level}")
continue # move to next table entry if not data files
Expand All @@ -140,21 +200,25 @@ def main():
dataset_id_list = []
file_scope_list = []
dataset_name_list = []

if dataset_id: # check if a Dataset entity was previously recorded
print(f"--> Files will be added to Dataset {dataset_id}")
else:
create_dataset = True
update_dsp_sheet = True

if formats: # only filter files if formats were specified
print(f"--> Filtering files from {scope_id}")
print(f"--> Only files of format {formats} will be included")
folder_or_files = "files" # filter files by extension/format
else:
folder_or_files = "folder" # whole folder should be added, don't filter files

scope_files = filter_files_in_folder(syn, scope_id, formats, folder_or_files)
print(f"--> {scope_id} files acquired!\n {len(scope_files)} files will be added to the Dataset.")
if cutoff_date:
print(f"--> Filtering files based on cutoff date: {cutoff_date}")
print(" ".join(["--> Files created", "after" if after_date is True else "before", "cutoff date will be added to dataset"]))

if check_version:
print("--> Current versions of files will be added to dataset")
else:
print(f"--> Only version {default_version} files will be added to dataset.\n--> If mixed file versions are expected, pass '-c 0' at runtime.")

scope_files = filter_files_in_folder(syn, scope_id, formats, folder_or_files, cutoff_date, after_date, check_version, default_version)
print(f"--> Scope processing complete!")

if len(scope_files) > file_max:
new_dataset_count = (len(scope_files) // file_max)
Expand All @@ -163,12 +227,11 @@ def main():
update_dsp_sheet = True
create_dataset = True
print(
f"--> File count exceeds file max.\n--> Creating {dataset_total} groups for files from {scope_id}"
f"--> File count exceeds file max (n={file_max}). {new_dataset_count} Datasets will be created.\n--> {len(scope_files)} files from {scope_id} will be added to a total of {dataset_total} Datasets."
)
file_scope_list = chunk_files_for_dataset(scope_files, file_max, new_dataset_count)

else:
multi_dataset = False
file_scope_list = [scope_files] # single dataset, no chunking needed

if dataset_id:
Expand All @@ -177,27 +240,31 @@ def main():
dataset_name_list.append(dataset.name)
dataset.add_items(dataset_items=file_scope_list[0], force=True)
syn.store(dataset)
print(f"--> Files added to existing Dataset {dataset.id}")
print(f"--> {len(file_scope_list[0])} files added to existing Dataset {dataset.id}")
file_scope_list = file_scope_list[1:] # remove first item, already added
else:
create_dataset = True
update_dsp_sheet = True

if create_dataset:
for scope in file_scope_list:
dataset = create_dataset_entity(syn, dataset_name, grant_id, multi_dataset, scope)
print(f"--> New Dataset created and populated with files!")
print(f"--> {len(scope)} files added to new Dataset {dataset.id}")
dataset_id_list.append(dataset.id)
dataset_name_list.append(dataset.name)

count += 1

dataset_tuples = zip(dataset_id_list, dataset_name_list)

for dataset_id, name in dataset_tuples:
if update_dsp_sheet is not None:
if update_dsp_sheet is not None:
print(f"Adding information for {new_dataset_count} Datasets ")
for populated_dataset_id, name in dataset_tuples:
temp_df = dsp_df.copy()
temp_df.iloc[[_]] = row
temp_df.at[_, "DatasetView Key"] = dataset_id
temp_df.at[_, "DatasetView Key"] = populated_dataset_id
temp_df.at[_, "DSP Dataset Name"] = name
updated_df = pd.concat([updated_df, temp_df], ignore_index=True)
updated_df = pd.concat([updated_df, temp_df], ignore_index=True)
updated_df.drop_duplicates(subset=["DatasetView Key"], keep="last", inplace=True)

else:
Expand Down
83 changes: 83 additions & 0 deletions utils/trim_datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""trim_datasets.py

This script will remove files from datasets based on Synapse Ids.
Takes a CSV with two columns as input: "datasets", "files"
Iterates through the datasets, checks for Synapse Ids in files, and removes them if present

Usage:
python trim_datasets.py -d [trim config filepath]

author: orion.banks
"""

import argparse
import os
import pandas as pd
import synapseclient
from synapseclient import Dataset


def get_args():
"""Set up command-line interface and get arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
"-d",
type=str,
help="Path or Table Synapse Id associated with a dataset trim config.",
required=True,
default=None,
)
return parser.parse_args()


def get_table(syn, source_id: str) -> pd.DataFrame:
"""Collect a Synapse table entity and return as a Dataframe."""

query = f"SELECT * FROM {source_id}"
table = syn.tableQuery(query).asDataFrame().fillna("")

return table


def remove_files_from_dataset(syn, dataset: str, files: list[str]) -> tuple[str, list]:
"""Get files in dataset and remove if in input list of file Synapse IDs"""

dataset_entity = syn.get(dataset, downloadFile=False)
dataset_files = pd.DataFrame(dataset_entity.properties.datasetItems)
files_to_remove = [file for file in files if file in dataset_files["entityId"].to_list()]
for file in files_to_remove:
dataset_entity.remove_item(file)
syn.store(dataset_entity)

return dataset_entity.id, files_to_remove

def main():

syn = synapseclient.login()

args = get_args()

config = args.d

if os.path.exists(config):
config_df = pd.read_csv(config, keep_default_na=False, header=0)
print("\nDataset trimming config read successfully!")
elif "syn" in config:
config_df = get_table(syn, config)
print(f"Data trimming config acquired from Synapse table {config}!")
else:
print(
f"❗❗❗ {config} is not a valid trimming config identifier. Please check your inputs and try again."
)
exit()

if "datasets" and "files" in config_df.columns:
dataset_list = [dataset for dataset in config_df["datasets"].to_list() if dataset]
file_list = config_df["files"].to_list()
for dataset in dataset_list:
if dataset:
dataset_id, removed_files = remove_files_from_dataset(syn, dataset, file_list)
print(f"{len(removed_files)} removed from Dataset {dataset_id}")

if __name__ == "__main__":
main()