diff --git a/cyhy_commander/commander.py b/cyhy_commander/commander.py index 4bc40b3..0afd297 100755 --- a/cyhy_commander/commander.py +++ b/cyhy_commander/commander.py @@ -463,15 +463,18 @@ def process_job_from_queue(target_job_queue, job_processing_function): job_processing_function (callable): The function used to process a job. Returns: - The job path that was processed or None if the queue was empty. + A tuple whose first item is the job path that was processed or None if + the queue was empty, and whose second item is the duration it took to + process the job or determine the queue was empty. """ job_path = None + job_processing_start = time.time() # check the successful jobs queue try: job_path = target_job_queue.get(timeout=1) except Queue.Empty: - return job_path + return (job_path, time.time() - job_processing_start) try: job_processing_function(job_path) @@ -482,8 +485,8 @@ def process_job_from_queue(target_job_queue, job_processing_function): # report task completion no matter what so the queue can be joined target_job_queue.task_done() - # return path of the job that was processed - return job_path + # return path of the job that was processed and how long it took to process + return (job_path, time.time() - job_processing_start) # run as long as the commander is processing jobs while self.__is_processing_jobs: @@ -493,13 +496,19 @@ def process_job_from_queue(target_job_queue, job_processing_function): ) # process failed job if a successful job was not processed - if job_processing_results is None: + if job_processing_results[0] is None: job_processing_results = process_job_from_queue( self.__failed_job_queue, self.__process_failed_job ) - # sleep if both queues are empty - if job_processing_results is None: + if job_processing_results[0] is not None: + # output how long it took to process the job + self.__logger.debug( + "Processed %s in %1.1f seconds" + % (job_processing_results[0], job_processing_results[1]) + ) + else: + # sleep if both queues are empty time.sleep(self.__job_processing_sleep_duration) def __process_successful_job(self, job_path): diff --git a/cyhy_commander/nmap/nmap_importer.py b/cyhy_commander/nmap/nmap_importer.py index e8f8b7f..b61e6ca 100755 --- a/cyhy_commander/nmap/nmap_importer.py +++ b/cyhy_commander/nmap/nmap_importer.py @@ -79,11 +79,21 @@ def __init__(self, db, stage=STAGE.PORTSCAN): def process(self, nmap_filename, target_filename): """Imports nmap files created from netscans and portscans""" + # Get the name of the current thread + thread_name = threading.current_thread().name + + self.__logger.debug( + "[%s] Starting processing of %s" % (thread_name, nmap_filename) + ) # import target ips ips = netaddr.IPSet() with open(target_filename) as f: for ip_line in f: self.__ticket_manager.ips.add(ip_line) + self.__logger.debug( + "[%s] Found %d targets in target file %s" + % (thread_name, len(self.__ticket_manager.ips), target_filename) + ) # parse nmap data f = open(nmap_filename, "rb") # sometimes the first line of the nmap output is not xml @@ -107,6 +117,11 @@ def __store_port_details(self, parsed_host): "[%s] No HostDoc found for IP %s" % (thread_name, str(ip)) ) ip_owner = UNKNOWN_OWNER + + self.__logger.debug( + "[%s] Processing %d port results for IP %s" + % (thread_name, len(parsed_host["ports"]), str(ip)) + ) for (port, details) in parsed_host["ports"].items(): if details["state"] != "open": # only storing open ports continue @@ -174,6 +189,9 @@ def __store_port_details(self, parsed_host): return has_at_least_one_open_port def __store_os_details(self, parsed_host): + # Get the name of the current thread + thread_name = threading.current_thread().name + details = dict() if parsed_host.has_key("os"): util.copy_attrs(parsed_host["os"], details) @@ -187,8 +205,16 @@ def __store_os_details(self, parsed_host): details["latest"] = True ip = parsed_host["addr"] + self.__logger.debug( + "[%s] Storing OS details for IP %s" % (thread_name, str(ip)) + ) + host_doc = self.__db.HostDoc.get_by_ip(ip) if host_doc and host_doc.get("hostnames"): + self.__logger.debug( + "[%s] HostDoc for IP %s has %d hostnames" + % (thread_name, str(ip), len(host_doc["hostnames"])) + ) # Create a HostScanDoc for each hostname/owner combination for h in host_doc["hostnames"]: host = self.__db.HostScanDoc() @@ -239,10 +265,23 @@ def __portscan_host_callback(self, parsed_host): self.__ch_db.transition_host(ip, has_open_ports=has_at_least_one_open_port) def __end_callback(self): + # Get the name of the current thread + thread_name = threading.current_thread().name + # clear the latest flags compiled from __netscan_host_callback down + self.__logger.debug( + "[%s] Resetting latest flag for %d IPs" + % (thread_name, len(self.__ips_to_reset_latest)) + ) self.__db.HostScanDoc.reset_latest_flag_by_ip(self.__ips_to_reset_latest) self.__db.PortScanDoc.reset_latest_flag_by_ip(self.__ips_to_reset_latest) self.__db.VulnScanDoc.reset_latest_flag_by_ip(self.__ips_to_reset_latest) # tell the ticket manager to close what needs to be closed + self.__logger.debug("[%s] Closing tickets" % thread_name) self.__ticket_manager.close_tickets() + self.__logger.debug( + "[%s] Clear the latest flag for applicable VulnScanDocs" % thread_name + ) self.__ticket_manager.clear_vuln_latest_flags() + + self.__logger.debug("[%s] Reached end of Nmap import" % thread_name)