Process retrieved jobs using a thread pool#15
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the commander to process completed jobs (both successful and failed) using a thread pool and queues, replacing the previous single-threaded approach. This change aims to reduce commander cycle duration by parallelizing job processing.
Key Changes:
- Introduced thread pool with configurable number of workers to process completed jobs
- Replaced direct job processing calls with queue-based job distribution
- Added synchronization points to ensure all queued jobs complete before cycle transitions
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
22b8d51 to
00e8051
Compare
|
@mcdonnnj This all looks good enough to me to begin testing in a dev environment - great work! |
jsf9k
left a comment
There was a problem hiding this comment.
This looks ready for testing. I'd like to see the other two Copilot findings addressed eventually.
Strong work!
20a1582 to
8521f85
Compare
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
jsf9k
left a comment
There was a problem hiding this comment.
Approved, modulo the issues that Copilot raised.
The `job-processing-threads` configuration option is being added in cisagov/cyhy-commander#15 and we need to support it here to support deployment with an appropriate setting.
dav3r
left a comment
There was a problem hiding this comment.
Strong work! 🚀
Can't wait to see how this works in Production!
20509a6 to
5910bf5
Compare
Since there should be no issue creating a thread object or adding a thread object to a list we should be fine to move these operations outside of the `try` block.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Now that we have two different types of threads it makes sense to specify which type failed to start. Co-authored-by: dav3r <david.redmin@gwe.cisa.dhs.gov>
The job processing threads will use the same name with the thread number appended. The queue monitor thread will simply have a static name. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
When the queue variables were added they were not put in alphabetical order.
This makes them less "magical" values and will allow them to be more readily configured if we so desire. Co-authored-by: Shane Frasier <jeremy.frasier@gwe.cisa.dhs.gov>
We ran into a bug where a job failed to process and as a result the thread processing it never calls `Queue.task_done()`. The thread was able to continue processing work, but without the call to `Queue.task_done()`, the main commander loop hung on its `Queue.join()` call. Thus we move this call out of the thread's `try`/`except` block to ensure that `Queue.task_done()` is called no matter what. This also follows best practices of minimizing the amount of code housed in a `try`/`except` block.
The `Queue.qsize()` method returns the "approximate size of the queue". Under full load in testing this is usually a 0 because there are no jobs unclaimed by a job processor thread. We instead output the value of `Queue.unfinished_tasks` to hopefully better inform how many jobs are still being processed.
This lock will prevent the queue monitor thread from outputting job processing queue information except for when there are actually jobs being placed into the job process queues.
Reduce the `while` loop to only check the value of self.__is_processing_jobs. The previous logic was needed because the value of self.__is_running could be change outside of the `do_work()` method. With the new control variable it is only modified within the `do_work()` method and is only set to `False` after the `do_work()`'s main `while` loop has exited. As a result, there should be no work on the job processing queues when this control variable is set to `False`. Thus, it is safe to rely solely on this control variable for the job processing threads' work loop.
Add an inner function that pulls from a specified queue and processes a job using a specified function. This will DRY out the job processing logic that is currently duplicated for each queue.
We need to update the comment now that we use a variable's value for the sleep duration and use a lock to control when the method would output queue information. Co-authored-by: dav3r <david.redmin@gwe.cisa.dhs.gov>
82007ad to
766a1e8
Compare
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Fix the variable referenced to mark a queue task as done. Adjust a comment to correctly reference the returned value. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
The logging statement was missed when this was added to other logging calls in code run by the job processing threads.
|
@mcdonnnj Is there anything holding us up from bumping the version and merging this PR? |
If you're satisfied with the negative tally mitigations done in other PRs there is not. I was waiting for us to feel comfortable with what we were seeing under production load before bumping and merging. |
Yes, I am satisfied with the changes you have made to prevent negative tallies. I think we are good to go. |
|
@cisagov/vm-dev I am planning to bypass the branch protection rule for conversation resolution. It lists a conversation as unresolved, but I cannot find it despite manually checking the conversations tab and querying the GitHub GraphQL API (I double-checked the GraphQL query against #23 and confirmed it found the unresolved conversation in that PR). |
🗣 Description
This pull request updates the commander to process completed jobs that are retrieved from scanner instances using a thread pool and queues.
Note
This PR adds a new
job-processing-threadsrequired key to the configuration file. I plan on updating our deployment configuration to include this key once this PR is ready for review.💭 Motivation and context
Processing completed work in a single-threaded manner has been showing a significant performance hit for commander cycles. This will hopefully allow us to parallelize processing completed jobs that are retrieved to reduce the duration of commander cycles.
🧪 Testing
do_workloop will wait for the queues to empty before continuing.✅ Pre-approval checklist
bump_versionscript if this repository is versioned and the changes in this PR warrant a version bump.✅ Post-merge checklist