diff --git a/nimp/base_commands/check.py b/nimp/base_commands/check.py index 6ecdd02c..6abde1f0 100644 --- a/nimp/base_commands/check.py +++ b/nimp/base_commands/check.py @@ -33,9 +33,7 @@ import re import shutil import time -from typing import Sequence -import psutil import nimp.command import nimp.sys.platform @@ -47,7 +45,13 @@ class Check(nimp.command.CommandGroup): '''Check related commands''' def __init__(self): - super(Check, self).__init__([_Status(), _Processes(), _Disks()]) + super(Check, self).__init__( + [ + _Status(), + _Processes(), + _Disks(), + ] + ) def is_available(self, env): return True, '' @@ -135,11 +139,6 @@ def _show_nimp_environment(env): class _Processes(CheckCommand): - PROCESS_IGNORE_PATTERNS: Sequence[re.Pattern] = ( - # re.compile(r'^CrashReportClient\.exe$', re.IGNORECASE), - re.compile(r'^dotnet\.exe$', re.IGNORECASE), - ) - def configure_arguments(self, env, parser): parser.add_argument('-k', '--kill', help='Kill processes that can prevent builds', action='store_true') parser.add_argument( @@ -149,11 +148,6 @@ def configure_arguments(self, env, parser): help='fnmatch filters, defaults to workspace', default=[os.path.normpath(f'{os.path.abspath(env.root_dir)}/*')], ) - parser.add_argument( - '--all-users', - help='By default, only check processes owned by the current user. Use this to check all running processes.', - action='store_true', - ) return True def _run_check(self, env: NimpEnvironment): @@ -165,192 +159,62 @@ def _run_check(self, env: NimpEnvironment): logging.warning("Command only available on Windows platform") return True - current_user: str | None = None - if not env.all_users: - current_user = psutil.Process().username() - logging.debug("Only act on processes owned by %s", current_user) - - # Find all running processes running a program that any filter match either: - # - the program executable - # - an open file handle + # Find all running binaries launched from the project directory # and optionally kill them, unless they’re in the exception list. # We get to try 5 times just in case for _ in range(5): - checked_processes_count = 0 - problematic_processes: list[psutil.Process] = [] - - # psutil.process_iter caches processes - # we want a fresh list since we might have killed/ - # process completed since last iteration - logging.debug("Clear psutil process cache") - psutil.process_iter.cache_clear() - logging.debug("Get current process") - current_process = psutil.Process() - logging.debug("Current process is %d", current_process.pid) - ignore_process_ids = set( - ( - current_process.pid, - *(p.pid for p in current_process.parents()), - *(p.pid for p in current_process.children(recursive=True)), - ) - ) - if logging.getLogger().isEnabledFor(logging.DEBUG): - logging.debug("Ignore processes:") - for pid in ignore_process_ids: - ignored_process = psutil.Process(pid) - logging.debug("\t%s (%s) %s", ignored_process.exe(), ignored_process.pid, ignored_process.cmdline()) - - for process in psutil.process_iter(): - if not process.is_running(): - continue + found_problem = False + processes = _Processes._list_windows_processes() - logging.debug("Checking process %d", process.pid) - if process.pid in ignore_process_ids: - logging.debug("[Process(%d)] ignore process (self, parent or child)", process.pid) + for pid, info in processes.items(): + if not any(fnmatch.fnmatch(info[0], filter) for filter in env.filters): continue - - if current_user is not None: - if not _Processes._process_owned_by_user(process, current_user): - continue - - checked_processes_count += 1 - if not _Processes._process_matches_filters(process, env.filters): - continue - - try: - process_exe = process.exe() - except psutil.Error as exc: - logging.debug('[Process(%d)] failed to retrieve process executable', process.pid, exc_info=exc) - process_exe = "[UNKOWN]" - - if _Processes._should_ignore_process(process): - logging.info('[Process(%d)] process (%s) will be kept alive', process.pid, process_exe) + process_basename = os.path.basename(info[0]) + processes_ignore_patterns = _Processes.get_processes_ignore_patterns() + if any([re.match(p, process_basename, re.IGNORECASE) for p in processes_ignore_patterns]): + logging.info(f'process {pid} {info[0]} will be kept alive') continue - - problematic_processes.append(process) - logging.warning('[Process(%d)] Found problematic process (%s)', process.pid, process_exe) - try: - if (parent_process := process.parent()) is not None: - logging.warning('\tParent is %s (%s)', parent_process.pid, parent_process.exe()) - except psutil.Error as exc: - logging.debug( - '[Process(%d)] failed to get parent process information for process "%s"', - process.pid, - process_exe, - exc_info=exc, - ) - - logging.info('%d processes checked.', checked_processes_count) - if not problematic_processes: - # no problematic processes running, nothing to do. - return True - - sleep_time = 5.0 + logging.warning('Found problematic process %s (%s)', pid, info[0]) + found_problem = True + if info[1] in processes: + logging.warning('Parent is %s (%s)', info[1], processes[info[1]][0]) + if env.kill: + logging.info('Killing process…') + nimp.sys.process.call(['wmic', 'process', 'where', 'processid=' + pid, 'delete']) + logging.info('%s processes checked.', len(processes)) if not env.kill: - # Wait a bit, give a chance to problematic processes to end, - # even if not killed - - logging.debug("Wait %.2fs. Giving a chance to processes for a natural exit", sleep_time) - time.sleep(sleep_time) - else: - for p in problematic_processes: - if p.is_running(): - logging.info('Requesting process %s termination', p.pid) - p.terminate() - _, alive = psutil.wait_procs(problematic_processes, timeout=sleep_time) - for p in alive: - if p.is_running(): - logging.info('Process %s not terminated. Send kill.', p.pid) - p.kill() + return not found_problem + if not found_problem: + return True + time.sleep(5) return False @staticmethod - def _process_owned_by_user(process: psutil.Process, username: str): - try: - process_user = process.username() - except psutil.Error as exception: - logging.debug( - "[Process(%d)] Failed to retrieve process user", - process.pid, - exc_info=exception, - ) - return False - - is_same_user = username == process_user - - logging.debug( - "[Process(%d)] ignore process from other user (self: %s, process user: %s)", - process.pid, - username, - process_user, - ) - - return is_same_user - - @staticmethod - def _process_matches_filters(process: psutil.Process, filters: list[str]) -> bool: - """Returns True if the process should be filtered out""" - if not process.is_running(): - return False - - try: - process_exe = process.exe() - except Exception as exc: - logging.debug("[Process(%d)] Failed to determine process exe!", process.pid, exc_info=exc) - # failed to access a property of the process, - # assume it does not match to be safe - return False - - logging.debug("[Process(%d)] Check process against filters", process.pid) - for pattern in filters: - if fnmatch.fnmatch(process_exe, pattern): - logging.debug( - "process %s (%s), match filter '%s' with exe '%s'", - process.pid, - process_exe, - pattern, - process_exe, - ) - return True - - if not process.is_running(): - return False - - try: - open_files = process.open_files() - except Exception as exc: - logging.debug("[Process(%d)] Failed to query process open_files", process.pid, exc_info=exc) - # failed to access a property of the process, - # assume it does not match to be safe - return False - - for pattern in filters: - for popen_file in open_files: - if fnmatch.fnmatch(popen_file.path, pattern): - logging.debug( - "process %s (%s), match filter '%s' with popen file '%s'", - process.pid, - process_exe, - pattern, - popen_file.path, - ) - return True - - return False + def get_processes_ignore_patterns(): + return [ + # r'^CrashReportClient\.exe$', + r'^dotnet\.exe$', + ] @staticmethod - def _should_ignore_process(process: psutil.Process) -> bool: - try: - process_executable_path = process.exe() - process_basename = os.path.basename(process_executable_path) - except psutil.Error: - logging.debug( - "[Process(%d)] failed to retrieve process exe/basename", - process.pid, - ) - return True - return any(p.match(process_basename) for p in _Processes.PROCESS_IGNORE_PATTERNS) + def _list_windows_processes(): + processes = {} + # List all processes + cmd = ['wmic', 'process', 'get', 'executablepath,parentprocessid,processid', '/value'] + result, output, _ = nimp.sys.process.call(cmd, capture_output=True) + if result == 0: + # Build a dictionary of all processes + path, pid, ppid = '', 0, 0 + for line in [line.strip() for line in output.splitlines()]: + if line.lower().startswith('executablepath='): + path = re.sub('[^=]*=', '', line) + if line.lower().startswith('parentprocessid='): + ppid = re.sub('[^=]*=', '', line) + if line.lower().startswith('processid='): + pid = re.sub('[^=]*=', '', line) + processes[pid] = (path, ppid) + return processes class _Disks(CheckCommand):