Conversation
If the commander crashes, is shutdown, or is killed while processing retrieved jobs they are left unprocessed currently. This adjusts the commander to load any unfinished jobs still in the local success and failed directories before moving into its main operation loop.
There was a problem hiding this comment.
Pull Request Overview
This pull request adds functionality to process leftover local jobs from a previous commander session at startup, before entering the main work loop. This ensures that jobs retrieved from remote hosts but not yet processed (e.g., due to a crash or shutdown) are not lost.
Key changes:
- Loads existing job directories from SUCCESS_DIR and FAILED_DIR into processing queues at startup
- Blocks startup until all leftover jobs are processed by calling
join()on both queues - Uses the existing lock mechanism to allow queue monitoring during leftover job processing
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for job in os.listdir(target_dir): | ||
| target_queue.put(os.path.join(target_dir, job)) |
There was a problem hiding this comment.
Missing error handling for os.listdir() and file system operations. If there are permission issues or if the directory is temporarily unavailable, this code will crash during startup. Consider wrapping this section in a try-except block to log errors gracefully and continue startup even if leftover jobs cannot be processed.
Additionally, os.listdir() returns all entries in the directory including hidden files (e.g., .DS_Store, temporary files) and potentially non-directory entries. Consider filtering the results to only include valid job directories using os.path.isdir() to avoid processing invalid entries that could cause errors downstream.
| for job in os.listdir(target_dir): | |
| target_queue.put(os.path.join(target_dir, job)) | |
| try: | |
| for job in os.listdir(target_dir): | |
| job_path = os.path.join(target_dir, job) | |
| if os.path.isdir(job_path): | |
| target_queue.put(job_path) | |
| except Exception as e: | |
| self.__logger.error( | |
| "Error processing leftover jobs in directory '%s': %s", target_dir, e | |
| ) | |
| self.__logger.error(traceback.format_exc()) |
There was a problem hiding this comment.
Adding error handling is a good idea, even if only so we can output a more specific logging message before re-throwing the exception, but I'll leave it up to you to decide if it's worth it here. If not, please add an issue so we don't forget to revisit this.
There was a problem hiding this comment.
Aside from the error handling what are your thoughts on only processing directories or some other kind of filtration for entries in the target directories?
There was a problem hiding this comment.
If we don't filter directory names when processing them normally, I don't think we need to do it here. If we do, then we should filter similarly here. Not a bad idea to add a TODO issue to filter in both places though.
There was a problem hiding this comment.
We do not filter normally, but we also do not iterate over directory contents normally. The normal operation would simply ignore any directories that the commander itself did not put into the two directories while it is running.
There was a problem hiding this comment.
Then it's probably safest to filter so that we only process directories that match our expected pattern.
| ) | ||
|
|
||
| # clear out retrieved but unprocessed work | ||
| self.__logger.debug("Process any leftover local jobs") |
There was a problem hiding this comment.
I prefer this verbiage to "leftover", but feel free to ignore:
| self.__logger.debug("Process any leftover local jobs") | |
| self.__logger.debug("Process any previously-unprocessed local jobs") |
| self.__successful_job_queue.join() | ||
| self.__failed_job_queue.join() | ||
| self.__queue_monitor_output_lock.acquire() | ||
| self.__logger.debug("Finished processing leftover jobs") |
There was a problem hiding this comment.
To match verbiage in my other suggestion:
| self.__logger.debug("Finished processing leftover jobs") | |
| self.__logger.debug("Finished processing previously-unprocessed jobs") |
🗣 Description
This pull request adds a bit of logic to process any local jobs that are leftover from a previously running commander before moving into the main commander loop. This is done by simply loading them into the appropriate queue for the job processing threads to consume as they would freshly retrieved jobs.
💭 Motivation and context
If the commander crashes, is shutdown, or is killed while processing retrieved jobs they are currently left unprocessed. This results in retrieved work that is never fully processed and aside from slowly building up it will result in missed results.
🧪 Testing
I tested this in @dav3r's test environment and saw it process leftover work at startup.
✅ Pre-approval checklist
bump_versionscript if this repository is versioned and the changes in this PR warrant a version bump.✅ Post-merge checklist