Skip to content

Commit 3843191

Browse files
committed
manage compression many indexes at a time
1 parent 8178952 commit 3843191

File tree

3 files changed

+161
-33
lines changed

3 files changed

+161
-33
lines changed

bioindex/lib/aws.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,21 @@
2222

2323

2424
def get_bgzip_job_status(job_id: str):
25-
batch_client = boto3.client('batch')
26-
job_response = batch_client.describe_jobs(jobs=[job_id])
27-
if len(job_response['jobs']) > 0:
28-
return job_response['jobs'][0]['status']
29-
return None
25+
"""
26+
Get the status of a batch job. Returns the status string or None if job not found.
27+
Valid statuses: SUBMITTED, PENDING, RUNNABLE, STARTING, RUNNING, SUCCEEDED, FAILED
28+
"""
29+
try:
30+
batch_client = boto3.client('batch')
31+
job_response = batch_client.describe_jobs(jobs=[job_id])
32+
if len(job_response['jobs']) > 0:
33+
return job_response['jobs'][0]['status']
34+
else:
35+
print(f"Warning: Job {job_id} not found in AWS Batch")
36+
return None
37+
except Exception as e:
38+
print(f"Error getting status for job {job_id}: {str(e)}")
39+
return None
3040

3141

3242
def start_batch_job(s3_bucket: str, index_name: str, s3_path: str, job_definition: str, additional_parameters: dict = None):

bioindex/lib/index.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,6 @@ def batch_run_function(self, config, obj):
268268

269269
return start_and_wait_for_indexer_job(obj['Key'], self.name, self.schema.arity, config.s3_bucket, config.rds_secret,
270270
config.bio_schema, obj['Size'])
271-
"""
272-
Index the objects using a lambda function.
273-
"""
274271

275272
def lambda_run_function(self, config, obj):
276273
logging.info(f'Processing {relative_key(obj["Key"], self.s3_prefix)}...')

bioindex/main.py

Lines changed: 146 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,26 @@ def validate_job_type(ctx, param, value):
182182

183183

184184
def convert_cli_arg_to_list(value):
185-
if value:
186-
return [item.strip() for item in value]
185+
if not value:
186+
return None
187+
188+
# For multiple=True options, Click returns a tuple of values
189+
if isinstance(value, tuple):
190+
result = []
191+
for v in value:
192+
# Each value could be comma-separated
193+
if v and ',' in v:
194+
result.extend([item.strip() for item in v.split(',')])
195+
elif v:
196+
result.append(v.strip())
197+
return result
198+
# For single string values
199+
elif isinstance(value, str) and ',' in value:
200+
return [item.strip() for item in value.split(',')]
201+
# For single value
202+
elif value:
203+
return [value.strip()]
204+
187205
return None
188206

189207

@@ -194,25 +212,75 @@ def convert_cli_arg_to_list(value):
194212
help="List of index names to exclude from processing. E.g. --exclude=name1,name2")
195213
@click.option('--job-type', type=str, required=True, callback=validate_job_type,
196214
help="Type of job to perform. Must be one of COMPRESS, DECOMPRESS, DELETE_JSON")
215+
@click.option('--wait-for-completion/--no-wait', is_flag=True, default=True,
216+
help="Wait for all AWS batch jobs to complete before exiting")
197217
@click.pass_obj
198-
def cli_bulk_compression_management(cfg, include, exclude, job_type):
218+
def cli_bulk_compression_management(cfg, include, exclude, job_type, wait_for_completion):
199219
engine = migrate.migrate(cfg)
200220
indexes = index.Index.list_indexes(engine, False)
201221

222+
# Debug information
223+
console.print(f"Raw include: {include}")
224+
console.print(f"Raw exclude: {exclude}")
225+
202226
inclusion_list = convert_cli_arg_to_list(include)
203227
exclusion_list = convert_cli_arg_to_list(exclude)
204228

229+
console.print(f"Processed inclusion list: {inclusion_list}")
230+
console.print(f"Processed exclusion list: {exclusion_list}")
231+
console.print(f"Wait for completion: {wait_for_completion}")
232+
233+
job_ids = []
234+
console.print(f"Starting {job_type.value} jobs for selected indexes...")
235+
205236
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
206237
futures = []
207238
for i in indexes:
208239
if (inclusion_list is None or i.name in inclusion_list) and (
209240
exclusion_list is None or i.name not in exclusion_list):
210241
futures.append(
211-
executor.submit(check_index_and_launch_job, cfg, i.name, i.s3_prefix, job_type))
242+
executor.submit(check_index_and_launch_job, cfg, i.name, i.s3_prefix, job_type, return_job_id=True))
212243

213-
# wait for all futures to complete
244+
# collect all job IDs
214245
for future in concurrent.futures.as_completed(futures):
215-
future.result()
246+
try:
247+
job_id = future.result()
248+
if job_id:
249+
job_ids.append(job_id)
250+
console.print(f"Successfully queued job {job_id}")
251+
except Exception as e:
252+
console.print(f"Error launching job: {str(e)}")
253+
254+
if job_ids and wait_for_completion:
255+
console.print(f"Launched {len(job_ids)} jobs. Waiting for completion...")
256+
console.print(f"Job IDs to monitor: {job_ids}")
257+
258+
initial_wait = 90
259+
check_interval = 30
260+
console.print(f"Waiting initial {initial_wait} seconds before checking status...")
261+
time.sleep(initial_wait)
262+
263+
# Check all jobs until they're complete
264+
iteration = 1
265+
while job_ids:
266+
console.print(f"Status check iteration {iteration}, {len(job_ids)} jobs remaining")
267+
for job_id in list(job_ids): # Create a copy to safely modify during iteration
268+
try:
269+
status = aws.get_bgzip_job_status(job_id)
270+
console.print(f'{job_type} job, id: {job_id} status: {status}')
271+
if status in ['SUCCEEDED', 'FAILED']:
272+
console.print(f'{job_type} job {job_id} finished with status: {status}')
273+
job_ids.remove(job_id)
274+
except Exception as e:
275+
console.print(f"Error checking status for job {job_id}: {str(e)}")
276+
277+
if job_ids: # Only sleep if there are still jobs running
278+
console.print(f"Waiting {check_interval} seconds before next status check...")
279+
time.sleep(check_interval)
280+
281+
iteration += 1
282+
283+
console.print("All jobs completed.")
216284

217285

218286
@click.command(name='decompress')
@@ -239,29 +307,82 @@ def is_index_prefix_valid(cfg, idx: str, prefix: str):
239307
return len(selected_index) == 1
240308

241309

242-
def check_index_and_launch_job(cfg, index_name, prefix, job_type, additional_parameters=None):
243-
if is_index_prefix_valid(cfg, index_name, prefix):
244-
start_and_monitor_aws_batch_job(cfg.s3_bucket, job_type, index_name, prefix, additional_parameters=additional_parameters)
245-
else:
246-
console.print(f'Could not find unique index with name {index_name} and prefix {prefix}, quitting')
310+
def check_index_and_launch_job(cfg, index_name, prefix, job_type, additional_parameters=None, return_job_id=False):
311+
try:
312+
if is_index_prefix_valid(cfg, index_name, prefix):
313+
try:
314+
job_id = aws.start_batch_job(cfg.s3_bucket, index_name, prefix, job_type.value, additional_parameters=additional_parameters)
315+
console.print(f'{job_type} started with id {job_id}')
316+
317+
if return_job_id:
318+
return job_id
319+
else:
320+
# For backward compatibility with existing command usage
321+
start_and_monitor_aws_batch_job(cfg.s3_bucket, job_type, index_name, prefix,
322+
additional_parameters=additional_parameters, job_id=job_id)
323+
return None
324+
except Exception as e:
325+
console.print(f'Error starting job for index {index_name} with prefix {prefix}: {str(e)}')
326+
return None
327+
else:
328+
console.print(f'Could not find unique index with name {index_name} and prefix {prefix}, skipping')
329+
return None
330+
except Exception as e:
331+
console.print(f'Unexpected error for index {index_name}: {str(e)}')
332+
return None
247333

248334

249335
def start_and_monitor_aws_batch_job(s3_bucket: str, job_type: BgzipJobType, index_name: str, s3_path: str,
250336
initial_wait: int = 90, check_interval: int = 30,
251-
additional_parameters: dict = None):
252-
job_id = aws.start_batch_job(s3_bucket, index_name, s3_path, job_type.value, additional_parameters=additional_parameters)
253-
console.print(f'{job_type} started with id {job_id}')
254-
time.sleep(initial_wait)
255-
while True:
256-
status = aws.get_bgzip_job_status(job_id)
257-
console.print(f'{job_type} job, id: {job_id} status: {status}')
258-
if status == 'SUCCEEDED':
259-
console.print(f'{job_type} job {job_id} succeeded')
260-
break
261-
elif status == 'FAILED':
262-
console.print(f'{job_type} job {job_id} failed')
263-
break
264-
time.sleep(check_interval)
337+
additional_parameters: dict = None, job_id: str = None):
338+
try:
339+
# Start a job if job_id is not provided
340+
if job_id is None:
341+
job_id = aws.start_batch_job(s3_bucket, index_name, s3_path, job_type.value, additional_parameters=additional_parameters)
342+
console.print(f'{job_type} started with id {job_id}')
343+
console.print(f'Waiting {initial_wait} seconds before checking status...')
344+
time.sleep(initial_wait)
345+
346+
# Monitor the job until completion or max retries
347+
max_retries = 5
348+
retry_count = 0
349+
350+
while True:
351+
try:
352+
status = aws.get_bgzip_job_status(job_id)
353+
354+
if status is None:
355+
retry_count += 1
356+
if retry_count >= max_retries:
357+
console.print(f'Error: Could not get status for {job_type} job {job_id} after {max_retries} attempts, giving up')
358+
break
359+
console.print(f'Warning: Could not get status for {job_type} job {job_id}, retrying ({retry_count}/{max_retries})...')
360+
time.sleep(check_interval)
361+
continue
362+
363+
retry_count = 0 # Reset retry count on successful status check
364+
console.print(f'{job_type} job, id: {job_id} status: {status}')
365+
366+
if status == 'SUCCEEDED':
367+
console.print(f'{job_type} job {job_id} succeeded')
368+
break
369+
elif status == 'FAILED':
370+
console.print(f'{job_type} job {job_id} failed')
371+
break
372+
373+
console.print(f'Waiting {check_interval} seconds before next status check...')
374+
time.sleep(check_interval)
375+
376+
except Exception as e:
377+
console.print(f'Error checking job status: {str(e)}')
378+
retry_count += 1
379+
if retry_count >= max_retries:
380+
console.print(f'Error: Failed to check job status after {max_retries} attempts, giving up')
381+
break
382+
time.sleep(check_interval)
383+
384+
except Exception as e:
385+
console.print(f'Error in start_and_monitor_aws_batch_job: {str(e)}')
265386

266387

267388
@click.command(name='update-compressed-status')

0 commit comments

Comments
 (0)