diff --git a/documentation/DCP-documentation/step_1_configuration.md b/documentation/DCP-documentation/step_1_configuration.md index 3dc89a2..c5dddf3 100644 --- a/documentation/DCP-documentation/step_1_configuration.md +++ b/documentation/DCP-documentation/step_1_configuration.md @@ -28,8 +28,10 @@ For more information and examples, see [External Buckets](external_buckets.md). This is generally the bucket in the account in which you are running compute. * **SOURCE_BUCKET:** The bucket where the image files you will be reading are. Often, this is the same as AWS_BUCKET. -* **WORKSPACE:** The bucket where non-image files you will be reading are (e.g. pipeline, load_data.csv, etc.). +These files can be downloaded or read directly off the bucket (see `DOWNLOAD_FILES` below for more). +* **WORKSPACE_BUCKET:** The bucket where non-image files you will be reading are (e.g. pipeline, load_data.csv, etc.). Often, this is the same as AWS_BUCKET. +Workspace files will always be automatically downloaded to your EC2 instance (as of v2.2.1). * **DESTINATION_BUCKET:** The bucket where you want to write your output files. Often, this is the same as AWS_BUCKET. * **UPLOAD_FLAGS:** If you need to add flags to an AWS CLI command to upload flags to your DESTINATION_BUCKET, this is where you enter them. @@ -57,6 +59,7 @@ If you have multiple Dockers running per machine, each Docker will have access t This typically requires a larger EBS volume (depending on the size of your image sets, and how many sets are processed per group), but avoids occasional issues with S3FS that can crop up on longer runs. By default, DCP uses S3FS to mount the S3 `SOURCE_BUCKET` as a pseudo-file system on each EC2 instance in your spot fleet to avoid file download. If you are unable to mount the `SOURCE_BUCKET` (perhaps because of a permissions issue) you should proceed with `DOWNLOAD_FILES = 'True'`. +Note that as of v2.2.1, all non-image files (e.g. load_data.csv's and pipelines) are downloaded regardless of this setting and regardless of whether `SOURCE_BUCKET` and `WORKSPACE_BUCKET` are the same. * **ASSIGN_IP:** Whether or not to assign an a public IPv4 address to each instance in the spot fleet. If set to 'False' will overwrite whatever is in the Fleet file. If set to 'True' will respect whatever is in the Fleet file. diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 4fad002..e54b6b8 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -109,6 +109,8 @@ def printandlog(text,logger): ################################# def runCellProfiler(message): + s3client=boto3.client('s3') + #List the directories in the bucket- this prevents a strange s3fs error rootlist=os.listdir(DATA_ROOT) for eachSubDir in rootlist: @@ -167,7 +169,6 @@ def runCellProfiler(message): # See if this is a message you've already handled, if you've so chosen if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: - s3client=boto3.client('s3') bucketlist=s3client.list_objects(Bucket=DESTINATION_BUCKET,Prefix=f'{remoteOut}/') objectsizelist=[k['Size'] for k in bucketlist['Contents']] objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES] @@ -179,26 +180,37 @@ def runCellProfiler(message): return 'SUCCESS' except KeyError: #Returned if that folder does not exist pass + + # Download load data file + data_file_path = os.path.join(localIn,message['data_file']) + printandlog(f"Downloading {message['data_file']} from {WORKSPACE_BUCKET}", logger) + csv_insubfolders = message['data_file'].split('/') + subfolders = '/'.join((csv_insubfolders)[:-1]) + if not os.path.exists(os.path.join(localIn,subfolders)): + os.makedirs(os.path.join(localIn,subfolders), exist_ok=True) + try: + s3client.download_file(WORKSPACE_BUCKET, message['data_file'], data_file_path) + except botocore.exceptions.ClientError: + printandlog(f"Can't find load data file in S3. Looking for {message['data_file']} in {WORKSPACE_BUCKET}",logger) + printandlog("Aborting. Can't run without load data.",logger) + logger.removeHandler(watchtowerlogger) + return 'DOWNLOAD_PROBLEM' + + # Download pipeline and update pipeline path in message + printandlog(f"Downloading {message['pipeline']} from {WORKSPACE_BUCKET}", logger) + pipepath = os.path.join(localIn, message['pipeline'].split('/')[-1]) + try: + s3client.download_file(WORKSPACE_BUCKET, message['pipeline'], pipepath) + except botocore.exceptions.ClientError: + printandlog(f"Can't find pipeline in S3. Looking for {message['pipeline']} in {WORKSPACE_BUCKET}",logger) + printandlog("Aborting. Can't run without pipeline.",logger) + logger.removeHandler(watchtowerlogger) + return 'DOWNLOAD_PROBLEM' downloaded_files = [] - # Optional - download all files, bypass S3 mounting + # Optional - download image files, bypass S3 mounting if DOWNLOAD_FILES.lower() == 'true': - # Download load data file and image files - data_file_path = os.path.join(localIn,message['data_file']) - printandlog(f"Downloading {message['data_file']} from {WORKSPACE_BUCKET}", logger) - csv_insubfolders = message['data_file'].split('/') - subfolders = '/'.join((csv_insubfolders)[:-1]) - if not os.path.exists(os.path.join(localIn,subfolders)): - os.makedirs(os.path.join(localIn,subfolders), exist_ok=True) - s3client=boto3.client('s3') - try: - s3client.download_file(WORKSPACE_BUCKET, message['data_file'], data_file_path) - except botocore.exceptions.ClientError: - printandlog(f"Can't find load data file in S3. Looking for {message['data_file']} in {WORKSPACE_BUCKET}",logger) - printandlog("Aborting. Can't run without load data.",logger) - logger.removeHandler(watchtowerlogger) - return 'DOWNLOAD_PROBLEM' if message['data_file'][-4:]=='.csv': printandlog('Figuring which files to download', logger) import pandas @@ -273,20 +285,8 @@ def runCellProfiler(message): printandlog(f'Downloaded {str(len(downloaded_files))} files',logger) else: printandlog("Couldn't parse data file for file download. Not supported input of .csv or .txt",logger) - # Download pipeline and update pipeline path in message - printandlog(f"Downloading {message['pipeline']} from {WORKSPACE_BUCKET}", logger) - pipepath = os.path.join(localIn, message['pipeline'].split('/')[-1]) - try: - s3client.download_file(WORKSPACE_BUCKET, message['pipeline'], pipepath) - except botocore.exceptions.ClientError: - printandlog(f"Can't find pipeline in S3. Looking for {message['pipeline']} in {WORKSPACE_BUCKET}",logger) - printandlog("Aborting. Can't run without pipeline.",logger) - logger.removeHandler(watchtowerlogger) - return 'DOWNLOAD_PROBLEM' - else: - data_file_path = os.path.join(DATA_ROOT,message['data_file']) - pipepath = os.path.join(DATA_ROOT,message["pipeline"]) + printandlog('Using bucket mount for image files', logger) # Build and run CellProfiler command cpDone = f'{localOut}/cp.is.done'