diff --git a/tuned/plugins/plugin_kthread.py b/tuned/plugins/plugin_kthread.py new file mode 100644 index 00000000..32ce42ab --- /dev/null +++ b/tuned/plugins/plugin_kthread.py @@ -0,0 +1,817 @@ +from . import base +from .decorators import * +import tuned.consts as consts +import tuned.logs + +# The scheduler interface in os was introduced in Python 3.3 +# To also support earlier versions, we import some logic from plugin_scheduler +from .plugin_scheduler import SchedulerUtils, SchedulerUtilsSchedutils + +try: + import perf + have_perf = True +except ModuleNotFoundError: + have_perf = False +import procfs + +import errno +import os +import re +import select +import threading +import time + +log = tuned.logs.get() + +# threads can disappear at any time. in that case, we raise a custom exception +class ThreadNoLongerExists(Exception): + """A thread disappeared while processing it""" + pass + +# another custom exception to signal non-changeability of affinities +class AffinityNotChangeable(Exception): + """Affinity of a thread cannot be changed""" + pass + +class KthreadInfo(object): + """Kernel thread information for internal bookkeeping""" + def __init__(self, proc): + self.pid = proc.pid + self.comm = procfs.process_cmdline(proc) + self.affinity_changeable = None + self.sched_orig = None + self.tuned_affinity = False + self.tuned_sched = False + + def __str__(self): + """String representation of a kthread, for logging purposes""" + return "%d:%s" % (self.pid, self.comm) + +class SchedOpts(object): + """Scheduling options for a kthread, as parsed from the group.* options""" + def __init__(self, policy=None, priority=None, affinity=None): + self.policy = policy + self.priority = priority + self.affinity = affinity + +class GroupCmd(object): + """group.* definition from the instance options""" + def __init__(self, name, prio, sched, regex): + self.name = name + self.prio = prio + self.sched = sched + self.regex = regex + +class KthreadPlugin(base.Plugin): + r""" + `kthread`:: + + Allows tuning of kernel threads by setting their CPU affinities and + scheduling parameters. The plugin re-implements functionality already + present in the `scheduler` plugin. However, this plugin offers more + flexibility, as it allows tuning of individual kernel threads. + Multiple plugin instances can be defined,each addressing different groups + of kernel threads. + When using the `kthread` plugin, make sure to disable processing of kernel + threads in the `scheduler` plugin by setting its option + [option]`kthread_process=false`. + === + Tuning options are controlled by [option]`group` definitions. + + + A group definition has the form + `group. = :::` + + + with four required fields: + + + -- + `rule_prio`:: + priority of the group within this plugin instance (lower number indicates + higher priority) + `schedopts`:: + desired scheduling policy and priority, or either "*" or an empty string + to leave the scheduling options unchanged. + The first character defines the policy + + - f: SCHED_FIFO + - b: SCHED_BATCH + - r: SCHED_RR + - o: SCHED_OTHER + - i: SCHED_IDLE + + The remainder is the desired priority in the range 0..99. + For SCHED_OTHER, only a priority of 0 is allowed. + Examples: `f50` to set SCHED_FIFO with priority 50, `o0` for SCHED_OTHER + `affinity`:: + desired affinity (as cpulist string), or either "*" or an empty string + to leave the affinity unchanged + `regex`:: + regular expression to match kernel threads. Note that the thread name needs + to match the full regex, i.e. matching happens with re.fullmatch(). + -- + The [option]`group` options of the `kthread` plugin differ from those of + the `scheduler` plugin: + + - scheduling policy and priority are combined into one option + - affinities are specified as cpulist strings instead of masks + - regular expressions need to fully match the thread names + - no square brackets are added to the kernel thread names + + Example: + The `scheduler` definition + + group.ksoftirqd=0:f:2:*:^\[ksoftirqd + + is translated to the `kthread` definition + + group.ksoftirqd=0:f2:*:ksoftirqd.* + """ + def __init__(self, monitor_repository, storage_factory, hardware_inventory, device_matcher, device_matcher_udev, plugin_instance_factory, global_cfg, variables): + super(KthreadPlugin, self).__init__(monitor_repository, storage_factory, hardware_inventory, device_matcher, device_matcher_udev, plugin_instance_factory, global_cfg, variables) + self._has_dynamic_options = True + self._kthreads = {} + self._lock = threading.RLock() + self._instance_count = 0 + + try: + self._scheduler_utils = SchedulerUtils() + except AttributeError: + self._scheduler_utils = SchedulerUtilsSchedutils() + + global have_perf + if have_perf: + try: + self._perf_setup() + except Exception as e: + try: + self._perf_shutdown() + except Exception: + pass + have_perf = False + log.warning("perf initialization failed: %s. " \ + "Tuning will be applied to all currently running kthreads, but future kthreads will not be tuned." % e) + else: + log.warning("python-perf unavailable. " \ + "Tuning will be applied to all currently running kthreads, but future kthreads will not be tuned. " \ + "You can try to (re)install the python(3)-perf package.") + + def cleanup(self): + """Cleanup the plugin, unapplying tuning and stopping the perf monitor thread""" + super(KthreadPlugin, self).cleanup() + if have_perf: + self._perf_shutdown() + + # + # plugin-level methods: devices and plugin options + # + def _init_devices(self): + """Initialize devices for the plugin""" + super(KthreadPlugin, self)._init_devices() + self._kthread_pids_unassigned = set() + self._kthread_scan(initial=True) + + @classmethod + def _get_config_options(cls): + """Get plugin options. We have none, all options are instance-level""" + return { + # nothing here, the group.* options are covered by self._has_dynamic_options + } + + def _plugin_add_kthread(self, pid): + """Add kthread to the plugin for tuning (usually by one of the instances)""" + # Prefer active instances so the kthread gets tuned immediately + for instance in self._instances.values(): + if instance._tuning_active and self._get_matching_kthreads(instance, [pid]): + self._instance_add_kthread(instance, pid) + return + for instance in self._instances.values(): + if not instance._tuning_active and self._get_matching_kthreads(instance, [pid]): + self._instance_add_kthread(instance, pid) + return + self._kthread_pids_unassigned.add(pid) + + def _plugin_remove_kthread(self, pid): + """Remove kthread from the plugin (and from the instances)""" + for instance in self._instances.values(): + self._instance_remove_kthread(instance, pid) + + # + # instance-level methods: implement the Instance interface + # + def _instance_init(self, instance): + """Initialize an instance""" + instance._has_static_tuning = True + instance._has_dynamic_tuning = False + # kthreads handled by instance, assigned and processed + instance._kthreads_assigned = set() + instance._kthreads_processed = set() + instance._tuning_active = False + # process group.* options + self._instance_prepare_matching(instance) + # grab initial set of kthreads + self._instance_acquire_kthreads(instance) + + def _instance_cleanup(self, instance): + """Cleanup an instance""" + self._instance_release_kthreads(instance) + + def _instance_prepare_matching(self, instance): + """Process all group.* options and populate instance._groups""" + groups = [] + for k, v in instance.options.items(): + # group definitions have the format: + # group. = ::: + if not k.startswith("group."): + continue + name = k[len("group."):] + opt = self._variables.expand(v).split(":", 3) + if not len(opt) == 4: + log.error("Invalid definition for '%s': need exactly 4 arguments" % k) + continue + opt_rule_prio, opt_schedopts, opt_affinity, opt_regex = opt + # parse rule prio + try: + rule_prio = int(opt_rule_prio) + except ValueError: + log.error("Could not parse rule prio for '%s': '%s' is not a number" % (k, opt_rule_prio)) + continue + # parse scheduling options + policy, priority, affinity = None, None, None + if opt_schedopts in ["", "*"]: + pass + elif len(opt_schedopts) > 1 and opt_schedopts[0] in self._scheduler_utils._dict_schedcfg2num.keys(): + policy = self._scheduler_utils.sched_cfg_to_num(opt_schedopts[0]) + try: + priority = int(opt_schedopts[1:]) + except ValueError: + log.error("Could not parse scheduling priority for '%s': '%s' is not a number" % (k, opt_schedopts[1:])) + continue + prio_min = self._scheduler_utils.get_priority_min(policy) + prio_max = self._scheduler_utils.get_priority_max(policy) + policy_name = self._scheduler_utils.sched_num_to_const(policy) + if priority < prio_min or priority > prio_max: + log.error("Could not parse scheduling priority for '%s': %s requires priority in range %d - %d, got %d" % (k, policy_name, prio_min, prio_max, priority)) + continue + else: + log.error("Could not parse scheduling priority for '%s': '%s' has wrong format" % (k, opt_schedopts)) + continue + if not opt_affinity in ["", "*"]: + affinity = set(self._cmd.cpulist_unpack(opt_affinity)) + if len(affinity) == 0: + log.error("Could not parse affinity for '%s': '%s' has wrong format" % (k, opt_affinity)) + continue + sched = SchedOpts(policy=policy, priority=priority, affinity=affinity) + # parse the regex + try: + regex = re.compile(opt_regex) + except re.error as e: + log.error("Could not compile regex for '%s': '%s'" % (k, e.msg)) + continue + groups.append(GroupCmd(name, rule_prio, sched, regex)) + instance._groups = sorted(groups, key=lambda x: x.prio) + + def _get_instance_sched_options(self, instance, kthread): + """ + determine options an instance would set for a kthread, None if the + instance would not set any (because none of the group.* regexes matches) + """ + for group in instance._groups: + if group.regex.fullmatch(kthread.comm): + return group.sched + return None + + def _get_matching_kthreads(self, instance, pids): + """ + determine which threads fit the given instance + """ + matching_kthreads = set() + for pid in pids: + try: + kthread = self._kthread_get(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + continue + if self._get_instance_sched_options(instance, kthread) is not None: + matching_kthreads.add(pid) + return matching_kthreads + + def _instance_add_kthread(self, instance, pid): + """add a kthread to an instance, and tune it""" + with self._lock: + if instance._tuning_active: + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + self._apply_kthread_tuning(kthread, opts) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + return + except Exception as e: + log.error("Failed to tune kthread %d: %s" % (pid, e)) + self._kthread_pids_unassigned.add(pid) + return + instance._kthreads_processed.add(pid) + else: + instance._kthreads_assigned.add(pid) + + def _instance_remove_kthread(self, instance, pid): + """remove a kthread from an instance, and unapply tuning""" + with self._lock: + if pid in instance._kthreads_assigned: + instance._kthreads_assigned.remove(pid) + elif pid in instance._kthreads_processed: + try: + instance._kthreads_processed.remove(pid) + kthread = self._kthread_get(pid) + self._restore_kthread_tuning(kthread) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + else: + # kthread does not belong to instance. ignore. + pass + + def _instance_transfer_kthread(self, instance_from, instance_to, pid): + """move a kthread from one instance to another, and seamlessly adapt tuning""" + with self._lock: + was_processed = pid in instance_from._kthreads_processed + if was_processed: + instance_from._kthreads_processed.remove(pid) + elif pid in instance_from._kthreads_assigned: + instance_from._kthreads_assigned.remove(pid) + if instance_to._tuning_active: + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance_to, kthread) + self._apply_kthread_tuning(kthread, opts) + instance_to._kthreads_processed.add(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + except Exception as e: + log.error("Failed to tune kthread %d during transfer: %s" % (pid, e)) + # Roll back: return PID to source instance + if was_processed: + instance_from._kthreads_processed.add(pid) + else: + instance_from._kthreads_assigned.add(pid) + else: + instance_to._kthreads_assigned.add(pid) + + def _instance_acquire_kthreads(self, instance): + """assign all matching kthreads to an instance""" + # first the ones that are currently unassigned + with self._lock: + acquire_kthreads = self._get_matching_kthreads(instance, self._kthread_pids_unassigned) + self._kthread_pids_unassigned -= acquire_kthreads + for pid in acquire_kthreads: + self._instance_add_kthread(instance, pid) + # and then the ones from other instances + with self._lock: + for other_instance in self._instances.values(): + if (other_instance == instance or instance.priority > other_instance.priority or not hasattr(other_instance, "_kthreads_assigned")): + continue + # Don't steal from active instances while we are inactive; + # defer until our own activation in _instance_apply_static + if not instance._tuning_active and other_instance._tuning_active: + continue + transfer_kthreads = self._get_matching_kthreads(instance, other_instance._kthreads_assigned | other_instance._kthreads_processed) + for pid in transfer_kthreads: + self._instance_transfer_kthread(other_instance, instance, pid) + + def _instance_release_kthreads(self, instance): + """release all kthreads from an instance""" + with self._lock: + free_kthreads = instance._kthreads_assigned | instance._kthreads_processed + # first the ones now claimed by other instances + for other_instance in self._instances.values(): + if other_instance == instance: + continue + transfer_kthreads = self._get_matching_kthreads(other_instance, free_kthreads) + for pid in list(transfer_kthreads): + self._instance_transfer_kthread(instance, other_instance, pid) + free_kthreads.remove(pid) + # the remaining ones go back to unassigned (only if still alive) + for pid in free_kthreads: + self._instance_remove_kthread(instance, pid) + if pid in self._kthreads: + self._kthread_pids_unassigned.add(pid) + + def _instance_apply_static(self, instance): + """Apply instance static tuning""" + with self._lock: + if self._instance_count == 0: + # scan for kthreads that have appeared since plugin initialization + self._kthread_scan(initial=False) + if have_perf: + self._perf_monitor_start() + self._instance_count += 1 + instance._tuning_active = True + for pid in list(instance._kthreads_assigned): + instance._kthreads_assigned.remove(pid) + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + self._apply_kthread_tuning(kthread, opts) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + continue + except Exception as e: + log.error("Failed to tune kthread %d: %s" % (pid, e)) + self._kthread_pids_unassigned.add(pid) + continue + instance._kthreads_processed.add(pid) + # Now active, acquire kthreads from lower-priority instances + # that were deferred while we were inactive + self._instance_acquire_kthreads(instance) + + def _instance_verify_static(self, instance, ignore_missing, devices): + """Verify instance static tuning""" + result = True + with self._lock: + for pid in list(instance._kthreads_processed): + try: + kthread = self._kthread_get(pid) + opts = self._get_instance_sched_options(instance, kthread) + result &= self._verify_kthread_tuning(kthread, opts) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + return result + + def _instance_unapply_static(self, instance, rollback): + """Unapply instance static tuning, optionally rolling back to original settings""" + with self._lock: + instance._tuning_active = False + for pid in list(instance._kthreads_processed): + try: + kthread = self._kthread_get(pid) + if rollback == consts.ROLLBACK_FULL: + self._restore_kthread_tuning(kthread) + instance._kthreads_assigned.add(pid) + instance._kthreads_processed.remove(pid) + except ThreadNoLongerExists: + self._kthread_internal_remove(pid) + self._instance_count -= 1 + need_shutdown = (self._instance_count == 0) + if need_shutdown and have_perf: + self._perf_monitor_shutdown() + + # + # internal bookkeeping (self._kthreads) + # as these methods are called from the main thred and the perf monitor + # thread, we need to lock all accesses to self._kthreads + # + def _kthread_scan(self, initial=False): + """Scan procfs for kernel threads and add them to our bookkeeping + + Args: + initial (bool): is this the initial scan? passed on to _kthread_add() + """ + ps = procfs.pidstats() + for pid in ps.keys(): + self._kthread_internal_add(pid, initial) + + def _kthread_internal_add(self, pid, initial=False): + """Add kernel thread to internal bookkeeping + + Args: + pid (int): kernel thread pid + initial (bool): is this the initial scan? if yes, then add the new + kthread to _free_devices, else initiate hotplug mechanism via + _add_device() + """ + try: + proc = procfs.process(pid) + if not self._is_kthread(proc): + return + kthread = KthreadInfo(proc) + except (FileNotFoundError, ProcessLookupError): + return + + with self._lock: + if kthread.pid in self._kthreads: + return + self._kthreads[kthread.pid] = kthread + if initial: + self._kthread_pids_unassigned.add(kthread.pid) + else: + self._plugin_add_kthread(kthread.pid) + log.debug("Added kthread %s" % kthread) + + def _kthread_internal_remove(self, pid): + """Remove kernel thread from internal bookkeeping + + Args: + pid (int): kernel thread pid + """ + try: + with self._lock: + del self._kthreads[pid] + self._kthread_pids_unassigned.discard(pid) + self._plugin_remove_kthread(pid) + except KeyError: + return + log.debug("Removed kthread %d" % pid) + + def _kthread_get(self, pid): + """Get KthreadInfo object for a given PID + + Args: + pid (int): kernel thread pid + """ + try: + with self._lock: + return self._kthreads[pid] + except KeyError: + raise ThreadNoLongerExists() + + def _is_kthread(self, proc): + """helper to determine if a procfs process is a kernel thread""" + return proc["stat"]["flags"] & procfs.pidstat.PF_KTHREAD != 0 + + # + # methods to interact with perf + # + def _perf_setup(self): + """Setup perf monitoring""" + self._cpus = perf.cpu_map() + self._threads = perf.thread_map() + self._evlist = perf.evlist(self._cpus, self._threads) + evsel = perf.evsel( + type=perf.TYPE_SOFTWARE, + config=perf.COUNT_SW_DUMMY, + task=1, + comm=1, + mmap=0, + freq=0, + wakeup_events=1, + watermark=1, + sample_type=perf.SAMPLE_TID|perf.SAMPLE_CPU, + ) + evsel.open(cpus=self._cpus, threads=self._threads) + self._evlist.add(evsel) + self._evlist.mmap() + + def _perf_shutdown(self): + """Shutdown perf monitoring""" + if getattr(self, "_evlist", None): + for fd in self._evlist.get_pollfd(): + os.close(fd.name) + + def _perf_monitor_start(self): + """Start perf monitoring thread""" + self._terminate = threading.Event() + self._thread = threading.Thread(target=self._perf_monitor_thread) + self._thread.start() + + def _perf_monitor_shutdown(self): + """Stop perf monitoring thread""" + self._terminate.set() + self._thread.join() + + def _perf_monitor_thread(self): + """ + Thread to handle notifications from perf + + New kthreads sometimes (e.g., `irq/*` threads) are spawned with default + scheduling options and adapt their policy/priority themselves. + This opens a race window, where we possibly change sched options + before the thread itself does, leading to wrong settings, and also + wrong "original" settings in case we roll back the tuning. + So we delay tuning for newly created threads to reduce the chance of races. + """ + class NewThread(object): + """Helper class to keep track of new threads and their delay""" + def __init__(self, ts, tid): + self.ts, self.tid = ts, tid + new_threads = [] + thread_add_delay_s = 1.0 + + log.debug("perf monitor thread starting") + poll = select.poll() + fds = self._evlist.get_pollfd() + for fd in fds: + poll.register(fd) + while not self._terminate.is_set(): + try: + # process new threads when their delay has passed + now = time.time() + while len(new_threads) > 0 and now >= new_threads[0].ts: + self._kthread_internal_add(new_threads.pop(0).tid) + timeout_ms = 1000 + if len(new_threads) > 0: + timeout_ms = min(timeout_ms, int((new_threads[0].ts - now) * 1000)) + if len(poll.poll(timeout_ms)) == 0: + continue + # we have new perf notifications + now = time.time() + have_events = True + while have_events: + have_events = False + for cpu in self._cpus: + event = self._evlist.read_on_cpu(cpu) + if event and hasattr(event, "type"): + have_events = True + if event.type == perf.RECORD_COMM: + new_threads.append(NewThread(now + thread_add_delay_s, event.tid)) + elif event.type == perf.RECORD_EXIT: + self._kthread_internal_remove(event.tid) + except Exception as e: + log.error("perf monitor thread encountered an error: %s" % e) + # back off to avoid a tight error-spin loop on a persistent failure + self._terminate.wait(1.0) + log.debug("perf monitor thread shutting down") + + # + # methods for low-level manipulation of scheduling options + # via SchedulerUtils from .plugin_scheduler + # + def _set_affinity(self, pid, affinity): + """Set CPU affinity of a thread""" + try: + self._scheduler_utils.set_affinity(pid, affinity) + except OSError as e: + if hasattr(e, "errno") and e.errno == errno.ESRCH: + log.debug("Failed to set affinity of PID %d, the task vanished." % pid) + raise ThreadNoLongerExists() + else: + try: + proc = procfs.process(pid) + changeable = not proc["stat"].is_bound_to_cpu() + except (OSError, IOError): + raise ThreadNoLongerExists() + if not changeable: + raise AffinityNotChangeable() + log.error("Failed to set affinity of PID %d to '%s': %s" % (pid, affinity, e)) + raise e + + def _get_affinity(self, pid): + """Get CPU affinity of a thread""" + try: + return self._scheduler_utils.get_affinity(pid) + except OSError as e: + if hasattr(e, "errno") and e.errno == errno.ESRCH: + log.debug("Failed to get affinity of PID %d, the task vanished." % pid) + raise ThreadNoLongerExists() + else: + log.error("Failed to get affinity of PID %d: %s" % (pid, e)) + raise e + + def _set_schedopts(self, pid, policy, priority): + """Set scheduling policy and priority of a thread""" + try: + self._scheduler_utils.set_scheduler(pid, policy, priority) + except OSError as e: + if hasattr(e, "errno") and e.errno == errno.ESRCH: + log.debug("Failed to set scheduling of kthread %d, the task vanished." % pid) + raise ThreadNoLongerExists() + else: + log.error("Failed to set scheduling of kthread %d: %s" % (pid, e)) + raise e + + def _get_schedopts(self, pid): + """Get scheduling policy and priority of a thread""" + try: + return self._scheduler_utils.get_scheduler(pid), self._scheduler_utils.get_priority(pid) + except OSError as e: + if hasattr(e, "errno") and e.errno == errno.ESRCH: + log.debug("Failed to get scheduling of kthread %d, the task vanished." % pid) + raise ThreadNoLongerExists() + else: + log.error("Failed to get scheduling of kthread %d: %s" % (pid, e)) + raise e + + def _format_schedopts(self, policy, priority): + """Format scheduling policy and priority as string""" + return "%s:%d" % (self._scheduler_utils.sched_num_to_const(policy), priority) + + # + # "high-level" methods that work on KthreadInfo objects: + # + def _apply_kthread_tuning(self, kthread, opts): + """apply tuning while saving original settings""" + if kthread.sched_orig is not None: + return self._change_kthread_tuning(kthread, opts) + + current_affinity = self._get_affinity(kthread.pid) + current_policy, current_priority = self._get_schedopts(kthread.pid) + + affinity_changed = False + affinity_changeable = None + sched_changed = False + + if opts.affinity is not None and opts.affinity != current_affinity: + try: + self._set_affinity(kthread.pid, opts.affinity) + affinity_changed = True + affinity_changeable = True + log.debug("Set CPU affinity of kthread %s to '%s'" % (kthread, opts.affinity)) + except AffinityNotChangeable: + affinity_changeable = False + log.debug("The CPU affinity of kthread %s is not changeable" % kthread) + + if opts.policy is not None or opts.priority is not None: + if opts.policy != current_policy or opts.priority != current_priority: + try: + self._set_schedopts(kthread.pid, opts.policy, opts.priority) + sched_changed = True + log.debug("Set scheduling of kthread %s to '%s'" + % (kthread, self._format_schedopts(opts.policy, opts.priority))) + except Exception: + if affinity_changed: + try: + self._set_affinity(kthread.pid, current_affinity) + except Exception: + pass + raise + + kthread.sched_orig = SchedOpts(policy=current_policy, priority=current_priority, affinity=current_affinity) + if affinity_changeable is not None: + kthread.affinity_changeable = affinity_changeable + kthread.tuned_affinity = affinity_changed + kthread.tuned_sched = sched_changed + + def _change_kthread_tuning(self, kthread, opts): + """Change tuning of a kthread""" + prev_affinity = self._get_affinity(kthread.pid) + prev_tuned_affinity = kthread.tuned_affinity + affinity_changed = False + + if opts.affinity is None and kthread.tuned_affinity: + self._set_affinity(kthread.pid, kthread.sched_orig.affinity) + log.debug("Changed (restored) CPU affinity of kthread %s to '%s'" % (kthread, kthread.sched_orig.affinity)) + kthread.tuned_affinity = False + affinity_changed = True + elif opts.affinity is not None and kthread.affinity_changeable != False: + try: + self._set_affinity(kthread.pid, opts.affinity) + kthread.affinity_changeable = True + log.debug("Changed CPU affinity of kthread %s to '%s'" % (kthread, opts.affinity)) + kthread.tuned_affinity = True + affinity_changed = True + except AffinityNotChangeable: + kthread.affinity_changeable = False + log.debug("The CPU affinity of kthread %s is not changeable" % kthread) + + if opts.policy is None and opts.priority is None and kthread.tuned_sched: + self._set_schedopts(kthread.pid, kthread.sched_orig.policy, kthread.sched_orig.priority) + log.debug("Changed (restored) scheduling of kthread %s to '%s'" + % (kthread, self._format_schedopts(kthread.sched_orig.policy, kthread.sched_orig.priority))) + kthread.tuned_sched = False + elif opts.policy is not None or opts.priority is not None: + try: + self._set_schedopts(kthread.pid, opts.policy, opts.priority) + log.debug("Changed scheduling of kthread %s to '%s'" + % (kthread, self._format_schedopts(opts.policy, opts.priority))) + kthread.tuned_sched = True + except Exception: + if affinity_changed: + try: + self._set_affinity(kthread.pid, prev_affinity) + kthread.tuned_affinity = prev_tuned_affinity + except Exception: + pass + raise + + def _restore_kthread_tuning(self, kthread): + """restore tuning of a kthread to its original settings""" + opts = kthread.sched_orig + current_affinity = self._get_affinity(kthread.pid) + current_policy, current_priority = self._get_schedopts(kthread.pid) + if kthread.affinity_changeable and opts.affinity != current_affinity: + try: + self._set_affinity(kthread.pid, opts.affinity) + log.debug("Restored CPU affinity of kthread %s to '%s'" + % (kthread, opts.affinity)) + kthread.tuned_affinity = False + except AffinityNotChangeable: + log.debug("Failed to restore CPU affinity of kthread %s to '%s'" + % (kthread, opts.affinity)) + if opts.policy != current_policy or opts.priority != current_priority: + self._set_schedopts(kthread.pid, opts.policy, opts.priority) + log.debug("Restored scheduling of kthread %s to '%s'" + % (kthread, self._format_schedopts(opts.policy, opts.priority))) + kthread.tuned_sched = False + kthread.sched_orig = None + + def _verify_kthread_tuning(self, kthread, opts): + """verify that the tuning of a kthread matches the desired settings""" + affinity_ok, priority_ok = True, True + current_affinity = self._get_affinity(kthread.pid) + current_policy, current_priority = self._get_schedopts(kthread.pid) + if opts.affinity is not None and kthread.affinity_changeable: + desc = "CPU affinity of kthread %s" % kthread + current = self._cmd.cpulist2string(self._cmd.cpulist_pack(current_affinity)) + if opts.affinity == current_affinity: + log.info(consts.STR_VERIFY_PROFILE_VALUE_OK % (desc, current)) + else: + desired = self._cmd.cpulist2string(self._cmd.cpulist_pack(opts.affinity)) + log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired)) + affinity_ok = False + if opts.policy is not None or opts.priority is not None: + desc = "scheduling of kthread %s" % kthread + current = self._format_schedopts(current_policy, current_priority) + if opts.policy == current_policy and opts.priority == current_priority: + log.info(consts.STR_VERIFY_PROFILE_VALUE_OK % (desc, current)) + else: + desired = self._format_schedopts(opts.policy, opts.priority) + log.error(consts.STR_VERIFY_PROFILE_VALUE_FAIL % (desc, current, desired)) + priority_ok = False + return affinity_ok and priority_ok