diff --git a/src/deep_neurographs/utils/util.py b/src/deep_neurographs/utils/util.py index 2cf3ccf9..91ee7512 100644 --- a/src/deep_neurographs/utils/util.py +++ b/src/deep_neurographs/utils/util.py @@ -22,8 +22,6 @@ import ast import boto3 import json -import math -import networkx as nx import numpy as np import os import pandas as pd @@ -31,74 +29,74 @@ import shutil import smartsheet -from deep_neurographs.utils import graph_util as gutil - # --- OS utils --- -def mkdir(path, delete=False): +def listdir(path, extension=None): """ - Creates a directory at "path". + Lists all files in the directory at "path". If an extension is + provided, then only files containing "extension" are returned. Parameters ---------- path : str - Path of directory to be created. - delete : bool, optional - Indication of whether to delete directory at path if it already - exists. The default is False. + Path to directory to be searched. + extension : str, optional + Extension of file type of interest. The default is None. Returns ------- - None + List[str] + Filenames in directory with extension "extension" if provided. + Otherwise, list of all files in directory. """ - if delete: - rmdir(path) - if not os.path.exists(path): - os.mkdir(path) + if extension is None: + return [f for f in os.listdir(path)] + else: + return [f for f in os.listdir(path) if f.endswith(extension)] -def rmdir(path): +def list_files_in_zip(zip_content): """ - Removes directory and all subdirectories at "path". + Lists all files in a zip file stored in a GCS bucket. Parameters ---------- - path : str - Path to directory and subdirectories to be deleted if they exist. + zip_content : str + Content stored in a zip file in the form of a string of bytes. Returns ------- - None + list[str] + List of filenames in a zip file. """ - if os.path.exists(path): - shutil.rmtree(path) + with ZipFile(BytesIO(zip_content), "r") as zip_file: + return zip_file.namelist() -def listdir(path, extension=None): +def list_paths(directory, extension=None): """ - Lists all files in the directory at "path". If an extension is - provided, then only files containing "extension" are returned. + Lists all paths within "directory" that end with "extension" if provided. Parameters ---------- - path : str - Path to directory to be searched. + directory : str + Directory to be searched. extension : str, optional - Extension of file type of interest. The default is None. + If provided, only paths of files with the extension are returned. The + default is None. Returns ------- - List[str] - Filenames in directory with extension "extension" if provided. - Otherwise, list of all files in directory. + list[str] + List of all paths within "directory". """ - if extension is None: - return [f for f in os.listdir(path)] - else: - return [f for f in os.listdir(path) if f.endswith(extension)] + paths = list() + for f in listdir(directory, extension=extension): + paths.append(os.path.join(directory, f)) + return paths def list_subdirs(path, keyword=None, return_paths=False): @@ -134,28 +132,45 @@ def list_subdirs(path, keyword=None, return_paths=False): return sorted(subdirs) -def list_paths(directory, extension=None): +def mkdir(path, delete=False): """ - Lists all paths within "directory" that end with "extension" if provided. + Creates a directory at "path". Parameters ---------- - directory : str - Directory to be searched. - extension : str, optional - If provided, only paths of files with the extension are returned. The - default is None. + path : str + Path of directory to be created. + delete : bool, optional + Indication of whether to delete directory at path if it already + exists. The default is False. Returns ------- - list[str] - List of all paths within "directory". + None """ - paths = list() - for f in listdir(directory, extension=extension): - paths.append(os.path.join(directory, f)) - return paths + if delete: + rmdir(path) + if not os.path.exists(path): + os.mkdir(path) + + +def rmdir(path): + """ + Removes directory and all subdirectories at "path". + + Parameters + ---------- + path : str + Path to directory and subdirectories to be deleted if they exist. + + Returns + ------- + None + + """ + if os.path.exists(path): + shutil.rmtree(path) def set_path(dirname, filename, extension): @@ -221,6 +236,7 @@ def set_zip_path(zip_writer, filename, extension): return f +# --- IO utils --- def combine_zips(zip_paths, output_zip_path): """ Combines a list of ZIP archives into a single ZIP archive. @@ -247,83 +263,6 @@ def combine_zips(zip_paths, output_zip_path): out_zip.writestr(item, zip_in.read(item.filename)) -def list_files_in_zip(zip_content): - """ - Lists all files in a zip file stored in a GCS bucket. - - Parameters - ---------- - zip_content : str - Content stored in a zip file in the form of a string of bytes. - - Returns - ------- - list[str] - List of filenames in a zip file. - - """ - with ZipFile(BytesIO(zip_content), "r") as zip_file: - return zip_file.namelist() - - -def list_gcs_filenames(gcs_dict, extension): - """ - Lists all files in a GCS bucket with the given extension. - - Parameters - ---------- - gcs_dict : dict - ... - extension : str - File extension of filenames to be listed. - - Returns - ------- - list - Filenames stored at "cloud" path with the given extension. - - """ - bucket = storage.Client().bucket(gcs_dict["bucket_name"]) - blobs = bucket.list_blobs(prefix=gcs_dict["path"]) - return [blob.name for blob in blobs if extension in blob.name] - - -def list_gcs_subdirectories(bucket_name, prefix): - """ - Lists all direct subdirectories of a given prefix in a GCS bucket. - - Parameters - ---------- - bucket : str - Name of bucket to be read from. - prefix : str - Path to directory in "bucket". - - Returns - ------- - list[str] - List of direct subdirectories. - - """ - # Load blobs - storage_client = storage.Client() - blobs = storage_client.list_blobs( - bucket_name, prefix=prefix, delimiter="/" - ) - [blob.name for blob in blobs] - - # Parse directory contents - prefix_depth = len(prefix.split("/")) - subdirs = list() - for prefix in blobs.prefixes: - is_dir = prefix.endswith("/") - is_direct_subdir = len(prefix.split("/")) - 1 == prefix_depth - if is_dir and is_direct_subdir: - subdirs.append(prefix) - return subdirs - - -# --- IO utils --- def read_json(path): """ Reads JSON file located at the given path. @@ -385,7 +324,7 @@ def read_zip(zip_file, path): def write_json(path, contents): """ - Writes "contents" to a json file at "path". + Writes "contents" to a JSON file at "path". Parameters ---------- @@ -403,6 +342,27 @@ def write_json(path, contents): json.dump(contents, f) +def write_list(path, my_list): + """ + Writes each item in a list to a text file, with each item on a new line. + + Parameters + ---------- + path : str + Path where text file is to be written. + my_list + The list of items to write to the file. + + Returns + ------- + None + + """ + with open(path, "w") as file: + for item in my_list: + file.write(f"{item}\n") + + def write_txt(path, contents): """ Writes "contents" to a txt file at "path". @@ -424,25 +384,62 @@ def write_txt(path, contents): f.close() -def write_list(path, my_list): +# --- GCS utils --- +def list_gcs_filenames(gcs_dict, extension): """ - Writes each item in a list to a text file, with each item on a new line. + Lists all files in a GCS bucket with the given extension. Parameters ---------- - path : str - Path where text file is to be written. - my_list - The list of items to write to the file. + gcs_dict : dict + ... + extension : str + File extension of filenames to be listed. Returns ------- - None + list + Filenames stored at "cloud" path with the given extension. """ - with open(path, "w") as file: - for item in my_list: - file.write(f"{item}\n") + bucket = storage.Client().bucket(gcs_dict["bucket_name"]) + blobs = bucket.list_blobs(prefix=gcs_dict["path"]) + return [blob.name for blob in blobs if extension in blob.name] + + +def list_gcs_subdirectories(bucket_name, prefix): + """ + Lists all direct subdirectories of a given prefix in a GCS bucket. + + Parameters + ---------- + bucket : str + Name of bucket to be read from. + prefix : str + Path to directory in "bucket". + + Returns + ------- + List[str] + List of direct subdirectories. + + """ + # Load blobs + storage_client = storage.Client() + blobs = storage_client.list_blobs( + bucket_name, prefix=prefix, delimiter="/" + ) + [blob.name for blob in blobs] + + # Parse directory contents + prefix_depth = len(prefix.split("/")) + subdirs = list() + for prefix in blobs.prefixes: + is_dir = prefix.endswith("/") + is_direct_subdir = len(prefix.split("/")) - 1 == prefix_depth + if is_dir and is_direct_subdir: + subdirs.append(prefix) + return subdirs # --- S3 utils --- @@ -550,71 +547,7 @@ def upload_file_to_s3(source_path, bucket_name, destination_path): s3.upload_file(source_path, bucket_name, destination_path) -# --- math utils --- -def get_avg_std(data, weights=None): - """ - Computes the average and standard deviation of "data". If "weights" is - provided, the weighted average and standard deviation are computed. - - Parameters - ---------- - data : numpy.ndarray - Data to be evaluated. - weights : numpy.ndarray, optional - Weights to apply to each point in "data". The default is None. - - Returns - ------- - float, float - Average and standard deviation of "data". - - """ - avg = np.average(data, weights=weights) - var = np.average((data - avg) ** 2, weights=weights) - return avg, math.sqrt(var) - - -def sample_once(my_container): - """ - Samples a single element from "my_container". - - Parameters - ---------- - my_container : container - Container to be sampled from. - - Returns - ------- - sample - - """ - return sample(my_container, 1)[0] - - -# --- dictionary utils --- -def remove_items(my_dict, keys): - """ - Removes dictionary items corresponding to "keys". - - Parameters - ---------- - my_dict : dict - Dictionary to be edited. - keys : list - List of keys to be deleted from "my_dict". - - Returns - ------- - dict - Updated dictionary. - - """ - for key in keys: - if key in my_dict: - del my_dict[key] - return my_dict - - +# --- Dictionary utils --- def find_best(my_dict, maximize=True): """ Given a dictionary where each value is either a list or int (i.e. cnt), @@ -647,6 +580,26 @@ def find_best(my_dict, maximize=True): return best_key +def remove_items(my_dict, keys): + """ + Removes dictionary items corresponding to "keys". + + Parameters + ---------- + my_dict : dict + Dictionary to be edited. + keys : list + List of keys to be deleted from "my_dict". + + Returns + ------- + dict + Updated dictionary. + + """ + return {k: v for k, v in my_dict.items() if k not in keys} + + # --- SmartSheet utils --- def find_row_id(brain_id, sheet): for row in sheet.rows: @@ -690,29 +643,7 @@ def update_smartsheet(access_token, brain_id): smartsheet_client.Sheets.update_rows(sheet_id, [updated_row]) -# --- miscellaneous --- -def count_fragments(fragments_pointer, min_size=0): - """ - Counts the number of fragments in a given predicted segmentation. - - Parameters - ---------- - swc_pointer : dict, list, str - Object that points to swcs to be read, see class documentation for - details. - min_size : float, optional - - """ - # Extract fragments - graph_loader = gutil.GraphLoader(min_size=min_size, progress_bar=True) - graph = graph_loader.run(fragments_pointer) - - # Report results - print("# Connected Components:", nx.number_connected_components(graph)) - print("# Nodes:", graph.number_of_nodes()) - print("# Edges:", graph.number_of_edges()) - - +# --- Miscellaneous --- def get_swc_id(path): """ Gets segment id of the swc file at "path". @@ -768,22 +699,21 @@ def numpy_to_hashable(arr): return [tuple(item) for item in arr.tolist()] -def reformat_number(number): +def sample_once(my_container): """ - Reformats large number to have commas. + Samples a single element from "my_container". Parameters ---------- - number : float - Number to be reformatted. + my_container : container + Container to be sampled from. Returns ------- - str - Reformatted number. + sample """ - return f"{number:,}" + return sample(my_container, 1)[0] def spaced_idxs(arr_length, k): @@ -814,14 +744,14 @@ def spaced_idxs(arr_length, k): def time_writer(t, unit="seconds"): """ - Converts a runtime "t" to a larger unit of time if applicable. + Converts a runtime to a larger unit of time if applicable. Parameters ---------- t : float Runtime. unit : str, optional - Unit of time that "t" is expressed in. + Unit that the given time is expressed in. Returns -------