Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4f6885c
Add queues to manage job processing
mcdonnnj Oct 14, 2025
f78429c
Process completed jobs using a thread pool
mcdonnnj Oct 14, 2025
384d8ea
Output thread name in logging
mcdonnnj Oct 14, 2025
9db2194
Do not use blocking `Queue.get()` calls
mcdonnnj Oct 14, 2025
7d747cb
Adjust job processing thread `while` loop
mcdonnnj Oct 14, 2025
3c98a8b
Add failure information when thread fail to start
mcdonnnj Oct 15, 2025
62e8286
Log the number of job processing threads
mcdonnnj Oct 15, 2025
18992ee
Add exception handling to the queue job processing logic
mcdonnnj Oct 23, 2025
d657515
Include thread name in Nessus importer logging output
mcdonnnj Oct 23, 2025
d44171b
Bail out of commander startup if thread creation fails
mcdonnnj Oct 29, 2025
b1345b9
Format project with `black`
mcdonnnj Oct 29, 2025
6cc63dd
Reduce sleep duration if both job queues are empty
mcdonnnj Oct 29, 2025
ee7ebca
Add a new class variable to control job processing threads
mcdonnnj Oct 30, 2025
7e22318
Add a logging message before waiting on job queues
mcdonnnj Oct 30, 2025
bd65ee5
Add a thread to output job queue load periodically
mcdonnnj Oct 30, 2025
e53ca94
Only perform thread starting in the `try` block
mcdonnnj Oct 30, 2025
a80dd94
Fix incorrect usage of `sleep()`
mcdonnnj Oct 30, 2025
21dac7c
Use consistent logging messages
mcdonnnj Oct 30, 2025
d86bbe7
Reference thread type in thread start failure log messages
mcdonnnj Oct 30, 2025
cf524e7
Format project with `black`
mcdonnnj Oct 30, 2025
c687f61
Provide names for threads created for job processing
mcdonnnj Oct 30, 2025
e911fc2
Fix sorting of class variable definitions
mcdonnnj Oct 30, 2025
139d10a
Move sleep duration static values into class variables
mcdonnnj Oct 30, 2025
d2cd01d
Ensure threads report task completion
mcdonnnj Oct 31, 2025
41426fb
Change the value used by the queue monitor
mcdonnnj Oct 31, 2025
49122e8
Add a lock for the queue monitor's output
mcdonnnj Nov 3, 2025
542c804
Minimize `while` loop condition for job processing threads
mcdonnnj Nov 3, 2025
7fb3c4d
Update job processing to use a helper function
mcdonnnj Nov 3, 2025
766a1e8
Update the comment for the `__monitor_job_queues()` method
mcdonnnj Nov 3, 2025
09bf219
Apply Copilot suggestions
mcdonnnj Nov 3, 2025
5b4a6d3
Ensure nmap_importer log outputs references thread name
mcdonnnj Nov 4, 2025
19d9012
Bump version from 1.0.2 to 1.1.0
mcdonnnj Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 164 additions & 12 deletions cyhy_commander/commander.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
from collections import defaultdict
import logging
import os
import Queue
import random
import shutil
import signal
import sys
import threading
import time
import traceback
from ConfigParser import SafeConfigParser
Expand Down Expand Up @@ -89,6 +91,7 @@
DEFAULT = "DEFAULT"
DEFAULT_SCHEDULER = "default-scheduler"
DEFAULT_SECTION = "default-section"
JOB_PROCESSING_THREADS = "job-processing-threads"
JOBS_PER_NESSUS_HOST = "jobs-per-nessus-host"
JOBS_PER_NMAP_HOST = "jobs-per-nmap-host"
KEEP_FAILURES = "keep-failures"
Expand Down Expand Up @@ -138,18 +141,24 @@ def __init__(self, config_section=None, debug_logging=False, console_logging=Fal
self.__all_hosts_idle = False
self.__config_section = config_section
self.__db = None
self.__failed_job_queue = None
self.__failure_sinks = []
self.__host_exceptions = defaultdict(lambda: 0)
self.__hosts_on_cooldown = []
self.__is_processing_jobs = True
self.__is_running = True
self.__job_processing_sleep_duration = 1
self.__keep_failures = False
self.__keep_successes = False
self.__log_output_sleep_duration = 10
self.__nessus_sources = []
self.__next_scan_limit = 2000
self.__nmap_sources = []
self.__queue_monitor_output_lock = threading.Lock()
self.__setup_directories()
self.__shutdown_when_idle = False
self.__success_sinks = []
self.__successful_job_queue = None
self.__test_mode = False

def __setup_logging(self, debug_logging, console_logging):
Expand Down Expand Up @@ -305,9 +314,9 @@ def __done_jobs(self):
local_job_path = os.path.join(destDir, job)

if destDir == SUCCESS_DIR:
self.__process_successful_job(local_job_path)
self.__successful_job_queue.put(local_job_path)
else:
self.__process_failed_job(local_job_path)
self.__failed_job_queue.put(local_job_path)

else:
self.__logger.warning(
Expand Down Expand Up @@ -428,29 +437,108 @@ def __fill_hosts(self, counts, sources, workgroup_name, jobs_per_host):
execute(self.__push_job, self, job_path, hosts=[lowest_host])
counts[lowest_host] += 1

def __monitor_job_queues(self):
# Output the number of jobs that are not done for each queue every
# self.__log_output_sleep_duration seconds while work is on the queues.
while self.__is_processing_jobs:
with self.__queue_monitor_output_lock:
self.__logger.debug(
"%d unfinished jobs in the successful job queue"
% self.__successful_job_queue.unfinished_tasks
)
self.__logger.debug(
"%d unfinished jobs in the failed job queue"
% self.__failed_job_queue.unfinished_tasks
)
time.sleep(self.__log_output_sleep_duration)

def __process_queued_jobs(self):

# define an inner function to process jobs
def process_job_from_queue(target_job_queue, job_processing_function):
"""Helper function to process jobs from a queue.

Args:
target_job_queue (Queue.Queue): The queue to get a job to process.
job_processing_function (callable): The function used to process a job.

Returns:
The job path that was processed or None if the queue was empty.
"""
job_path = None

# check the successful jobs queue
try:
job_path = target_job_queue.get(timeout=1)
except Queue.Empty:
return job_path

try:
job_processing_function(job_path)
except Exception, e:
self.__logger.critical(e)
self.__logger.critical(traceback.format_exc())

# report task completion no matter what so the queue can be joined
target_job_queue.task_done()

# return path of the job that was processed
return job_path

# run as long as the commander is processing jobs
while self.__is_processing_jobs:
# process successful job
job_processing_results = process_job_from_queue(
self.__successful_job_queue, self.__process_successful_job
)

# process failed job if a successful job was not processed
if job_processing_results is None:
job_processing_results = process_job_from_queue(
self.__failed_job_queue, self.__process_failed_job
)

# sleep if both queues are empty
if job_processing_results is None:
time.sleep(self.__job_processing_sleep_duration)

def __process_successful_job(self, job_path):
# Get the name of the current thread
thread_name = threading.current_thread().name

for sink in self.__success_sinks:
if sink.can_handle(job_path):
self.__logger.info("Processing %s with %s" % (job_path, sink))
self.__logger.info(
"[%s] Processing %s with %s" % (thread_name, job_path, sink)
)
sink.handle(job_path)
self.__logger.info("Processing completed")
self.__logger.info("[%s] Processing completed" % thread_name)
if not self.__test_mode and not self.__keep_successes:
shutil.rmtree(job_path)
self.__logger.info("%s deleted" % job_path)
self.__logger.info("[%s] %s deleted" % (thread_name, job_path))
return
self.__logger.warning("No handler was able to process %s" % job_path)
self.__logger.warning(
"[%s] No handler was able to process %s" % (thread_name, job_path)
)

def __process_failed_job(self, job_path):
# Get the name of the current thread
thread_name = threading.current_thread().name

for sink in self.__failure_sinks:
if sink.can_handle(job_path):
self.__logger.warning("Processing %s with %s" % (job_path, sink))
self.__logger.warning(
"[%s] Processing %s with %s" % (thread_name, job_path, sink)
)
sink.handle(job_path)
self.__logger.info("Processing completed")
self.__logger.info("[%s] Processing completed" % thread_name)
if not self.__test_mode and not self.__keep_failures:
shutil.rmtree(job_path)
self.__logger.info("%s deleted" % job_path)
self.__logger.info("[%s] %s deleted" % (thread_name, job_path))
return
self.__logger.warning("No handler was able to process %s" % job_path)
self.__logger.warning(
"[%s] No handler was able to process %s" % (thread_name, job_path)
)

def handle_term(self, signal, frame):
self.__logger.warning(
Expand All @@ -470,14 +558,15 @@ def __check_stop_file(self):
def __check_database_pause(self):
while self.__ch_db.should_commander_pause() and self.__is_running:
self.__logger.info("Commander is paused due to database request.")
time.sleep(10)
time.sleep(self.__log_output_sleep_duration)
self.__check_stop_file()

def __write_config(self):
config = SafeConfigParser()
config.set(None, DATABASE_URI, "mongodb://localhost:27017/")
config.set(None, JOBS_PER_NMAP_HOST, "8")
config.set(None, JOBS_PER_NESSUS_HOST, "8")
config.set(None, JOB_PROCESSING_THREADS, "4")
config.set(None, POLL_INTERVAL, "30")
config.set(None, NEXT_SCAN_LIMIT, "2000")
config.set(None, DEFAULT_SECTION, TESTING_SECTION)
Expand Down Expand Up @@ -603,6 +692,12 @@ def do_work(self):
self.__test_mode = config.getboolean(config_section, TEST_MODE)
self.__logger.info("Test mode: %s", self.__test_mode)
self.__keep_failures = config.getboolean(config_section, KEEP_FAILURES)
job_processing_thread_count = config.getint(
config_section, JOB_PROCESSING_THREADS
)
self.__logger.info(
"Number of job processing threads: %d", job_processing_thread_count
)
self.__logger.info("Keep failed jobs: %s", self.__keep_failures)
self.__keep_successes = config.getboolean(config_section, KEEP_SUCCESSES)
self.__logger.info("Keep successful jobs: %s", self.__keep_successes)
Expand All @@ -617,6 +712,43 @@ def do_work(self):
self.__setup_sources()
self.__setup_sinks()

self.__successful_job_queue = Queue.Queue()
self.__failed_job_queue = Queue.Queue()

# spin up the thread pool to process retrieved work
job_processing_threads = []
for t in range(job_processing_thread_count):
job_processing_thread = threading.Thread(
name="JobProcessor-%d" % t, target=self.__process_queued_jobs
)
job_processing_threads.append(job_processing_thread)
try:
job_processing_thread.start()
except Exception as e:
self.__logger.error("Unable to start job processing thread #%s", t)
self.__logger.error(e)
# bail out
self.__logger.critical(
"Shutting down due to inability to start job processing threads."
)
self.__is_running = False

# spin up a thread to output queue load information
self.__queue_monitor_output_lock.acquire()
job_queue_monitor_thread = threading.Thread(
name="QueueMonitor", target=self.__monitor_job_queues
)
try:
job_queue_monitor_thread.start()
except Exception as e:
self.__logger.error("Unable to start job queue monitoring thread")
self.__logger.error(e)
# bail out
self.__logger.critical(
"Shutting down due to inability to start queue monitoring thread."
)
self.__is_running = False

# pairs of hosts and job sources
work_groups = (
(NMAP_WORKGROUP, nmap_hosts, self.__nmap_sources, jobs_per_nmap_host),
Expand Down Expand Up @@ -706,11 +838,18 @@ def do_work(self):
self.__logger.debug(
"Checking remotes for completed jobs to download and process"
)
self.__queue_monitor_output_lock.release()
for (workgroup_name, hosts, sources, jobs_per_host) in work_groups:
if hosts == None:
continue
execute(self.__done_jobs, self, hosts=hosts)

# wait for work to process
self.__logger.debug("Waiting for completed jobs to be processed.")
self.__successful_job_queue.join()
self.__failed_job_queue.join()
self.__queue_monitor_output_lock.acquire()

# check for scheduled hosts
self.__logger.debug(
"Checking for scheduled DONE hosts to mark WAITING."
Expand Down Expand Up @@ -752,12 +891,25 @@ def do_work(self):
except Exception, e:
self.__logger.critical(e)
self.__logger.critical(traceback.format_exc())

# signal job processing threads to exit once they have finished all
# queued work
self.__is_processing_jobs = False
self.__queue_monitor_output_lock.release()

# wait for the job processing threads to exit
for job_processing_thread in job_processing_threads:
job_processing_thread.join()

# wait for the job queue monitoring thread to exit
job_queue_monitor_thread.join()

self.__logger.info("Shutting down.")
disconnect_all()


def main():
args = docopt(__doc__, version="v1.0.2")
args = docopt(__doc__, version="v1.1.0")
workingDir = os.path.join(os.getcwd(), args["<working-dir>"])
if not os.path.exists(workingDir):
print >>sys.stderr, 'Working directory "%s" does not exist. Attempting to create...' % workingDir
Expand Down
Loading