Skip to content

force workspace download #196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion documentation/DCP-documentation/step_1_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ For more information and examples, see [External Buckets](external_buckets.md).
This is generally the bucket in the account in which you are running compute.
* **SOURCE_BUCKET:** The bucket where the image files you will be reading are.
Often, this is the same as AWS_BUCKET.
* **WORKSPACE:** The bucket where non-image files you will be reading are (e.g. pipeline, load_data.csv, etc.).
These files can be downloaded or read directly off the bucket (see `DOWNLOAD_FILES` below for more).
* **WORKSPACE_BUCKET:** The bucket where non-image files you will be reading are (e.g. pipeline, load_data.csv, etc.).
Often, this is the same as AWS_BUCKET.
Workspace files will always be automatically downloaded to your EC2 instance (as of v2.2.1).
* **DESTINATION_BUCKET:** The bucket where you want to write your output files.
Often, this is the same as AWS_BUCKET.
* **UPLOAD_FLAGS:** If you need to add flags to an AWS CLI command to upload flags to your DESTINATION_BUCKET, this is where you enter them.
Expand Down Expand Up @@ -57,6 +59,7 @@ If you have multiple Dockers running per machine, each Docker will have access t
This typically requires a larger EBS volume (depending on the size of your image sets, and how many sets are processed per group), but avoids occasional issues with S3FS that can crop up on longer runs.
By default, DCP uses S3FS to mount the S3 `SOURCE_BUCKET` as a pseudo-file system on each EC2 instance in your spot fleet to avoid file download.
If you are unable to mount the `SOURCE_BUCKET` (perhaps because of a permissions issue) you should proceed with `DOWNLOAD_FILES = 'True'`.
Note that as of v2.2.1, all non-image files (e.g. load_data.csv's and pipelines) are downloaded regardless of this setting and regardless of whether `SOURCE_BUCKET` and `WORKSPACE_BUCKET` are the same.
* **ASSIGN_IP:** Whether or not to assign an a public IPv4 address to each instance in the spot fleet.
If set to 'False' will overwrite whatever is in the Fleet file.
If set to 'True' will respect whatever is in the Fleet file.
Expand Down
60 changes: 30 additions & 30 deletions worker/cp-worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def printandlog(text,logger):
#################################

def runCellProfiler(message):
s3client=boto3.client('s3')

#List the directories in the bucket- this prevents a strange s3fs error
rootlist=os.listdir(DATA_ROOT)
for eachSubDir in rootlist:
Expand Down Expand Up @@ -167,7 +169,6 @@ def runCellProfiler(message):
# See if this is a message you've already handled, if you've so chosen
if CHECK_IF_DONE_BOOL.upper() == 'TRUE':
try:
s3client=boto3.client('s3')
bucketlist=s3client.list_objects(Bucket=DESTINATION_BUCKET,Prefix=f'{remoteOut}/')
objectsizelist=[k['Size'] for k in bucketlist['Contents']]
objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES]
Expand All @@ -179,26 +180,37 @@ def runCellProfiler(message):
return 'SUCCESS'
except KeyError: #Returned if that folder does not exist
pass

# Download load data file
data_file_path = os.path.join(localIn,message['data_file'])
printandlog(f"Downloading {message['data_file']} from {WORKSPACE_BUCKET}", logger)
csv_insubfolders = message['data_file'].split('/')
subfolders = '/'.join((csv_insubfolders)[:-1])
if not os.path.exists(os.path.join(localIn,subfolders)):
os.makedirs(os.path.join(localIn,subfolders), exist_ok=True)
try:
s3client.download_file(WORKSPACE_BUCKET, message['data_file'], data_file_path)
except botocore.exceptions.ClientError:
printandlog(f"Can't find load data file in S3. Looking for {message['data_file']} in {WORKSPACE_BUCKET}",logger)
printandlog("Aborting. Can't run without load data.",logger)
logger.removeHandler(watchtowerlogger)
return 'DOWNLOAD_PROBLEM'

# Download pipeline and update pipeline path in message
printandlog(f"Downloading {message['pipeline']} from {WORKSPACE_BUCKET}", logger)
pipepath = os.path.join(localIn, message['pipeline'].split('/')[-1])
try:
s3client.download_file(WORKSPACE_BUCKET, message['pipeline'], pipepath)
except botocore.exceptions.ClientError:
printandlog(f"Can't find pipeline in S3. Looking for {message['pipeline']} in {WORKSPACE_BUCKET}",logger)
printandlog("Aborting. Can't run without pipeline.",logger)
logger.removeHandler(watchtowerlogger)
return 'DOWNLOAD_PROBLEM'

downloaded_files = []

# Optional - download all files, bypass S3 mounting
# Optional - download image files, bypass S3 mounting
if DOWNLOAD_FILES.lower() == 'true':
# Download load data file and image files
data_file_path = os.path.join(localIn,message['data_file'])
printandlog(f"Downloading {message['data_file']} from {WORKSPACE_BUCKET}", logger)
csv_insubfolders = message['data_file'].split('/')
subfolders = '/'.join((csv_insubfolders)[:-1])
if not os.path.exists(os.path.join(localIn,subfolders)):
os.makedirs(os.path.join(localIn,subfolders), exist_ok=True)
s3client=boto3.client('s3')
try:
s3client.download_file(WORKSPACE_BUCKET, message['data_file'], data_file_path)
except botocore.exceptions.ClientError:
printandlog(f"Can't find load data file in S3. Looking for {message['data_file']} in {WORKSPACE_BUCKET}",logger)
printandlog("Aborting. Can't run without load data.",logger)
logger.removeHandler(watchtowerlogger)
return 'DOWNLOAD_PROBLEM'
if message['data_file'][-4:]=='.csv':
printandlog('Figuring which files to download', logger)
import pandas
Expand Down Expand Up @@ -273,20 +285,8 @@ def runCellProfiler(message):
printandlog(f'Downloaded {str(len(downloaded_files))} files',logger)
else:
printandlog("Couldn't parse data file for file download. Not supported input of .csv or .txt",logger)
# Download pipeline and update pipeline path in message
printandlog(f"Downloading {message['pipeline']} from {WORKSPACE_BUCKET}", logger)
pipepath = os.path.join(localIn, message['pipeline'].split('/')[-1])
try:
s3client.download_file(WORKSPACE_BUCKET, message['pipeline'], pipepath)
except botocore.exceptions.ClientError:
printandlog(f"Can't find pipeline in S3. Looking for {message['pipeline']} in {WORKSPACE_BUCKET}",logger)
printandlog("Aborting. Can't run without pipeline.",logger)
logger.removeHandler(watchtowerlogger)
return 'DOWNLOAD_PROBLEM'

else:
data_file_path = os.path.join(DATA_ROOT,message['data_file'])
pipepath = os.path.join(DATA_ROOT,message["pipeline"])
printandlog('Using bucket mount for image files', logger)

# Build and run CellProfiler command
cpDone = f'{localOut}/cp.is.done'
Expand Down