Skip to content

Add PRE_JOB_RANK to CondorClassAdScheduler #25

Open
@HerrHorizontal

Description

@HerrHorizontal

The current implementation of the CondorClassAdScheduler

class CondorClassadJobScheduler(JobScheduler):
"""
Goal of the htcondor job scheduler is to have a scheduler that somehow
mimics how htcondor does schedule jobs.
Htcondor does scheduling based on a priority queue. The priorities itself
are managed by operators of htcondor.
So different instances can apparently behave very different.
In this case a priority queue that sorts job slots
by increasing cost is built. The scheduler checks if a job either
exactly fits a slot or if it does fit into it several times. The cost for
putting a job at a given slot is given by the amount of resources that
might remain unallocated.
"""
def __init__(
self,
job_queue,
machine_ad: str = machine_ad_defaults,
job_ad: str = job_ad_defaults,
pre_job_rank: str = "0",
interval: float = 60,
autocluster: bool = False,
):
"""
Initializes the CondorClassadJobScheduler
:param job_queue: queue of jobs that are scheduled in the following simulation
:param machine_ad: ClassAd that is used with every drone
:param job_ad: ClassAd that is used with every job
:param pre_job_rank: ClassAd attribute that all drones are sorted by
:param interval: time between scheduling cycles
:param autocluster: could be used to decide whether to use autoclusters
"""
self._stream_queue = job_queue
self._drones: RankedClusters[Drone] = RankedNonClusters(
quantization=quantization_defaults, ranking=parse(pre_job_rank)
)
self.interval = interval
self.job_queue = JobQueue()
self._collecting = True
self._processing = Resources(jobs=0)
# temporary solution
self._wrapped_classads = WeakKeyDictionary()
self._machine_classad = parse(machine_ad)
self._job_classad = parse(job_ad)
@property
def drone_list(self) -> Iterator[Drone]:
"""
Takes an iterator over the WrappedClassAd objects of drones known to the
scheduler, extracts the drones and returns an iterator over the drone objects.
:return:
"""
for cluster in self._drones.clusters():
for drone in cluster:
yield drone._wrapped
def register_drone(self, drone: Drone):
"""
Provides the drones with the drone ClassAd, combines both into one object and
adds the resulting WrappedClassAd object to the drones known to the scheduler as
well as the dictionary containing all WrappedClassAd objects the scheduler
works with.
:param drone:
"""
wrapped_drone = WrappedClassAd(classad=self._machine_classad, wrapped=drone)
self._drones.add(wrapped_drone)
self._wrapped_classads[drone] = wrapped_drone
def unregister_drone(self, drone: Drone):
"""
Remove a drone's representation from the scheduler's scope.
:param drone:
:return:
"""
drone_wrapper = self._wrapped_classads[drone]
self._drones.remove(drone_wrapper)
def update_drone(self, drone: Drone):
"""
Update a drone's representation in the scheduler scope.
:param drone:
:return:
"""
drone_wrapper = self._wrapped_classads[drone]
self._drones.update(drone_wrapper)
async def run(self):
"""
Runs the scheduler's functionality. One executed, the scheduler starts up and
begins to add the jobs that are
:return:
"""
async with Scope() as scope:
scope.do(self._collect_jobs())
async for _ in interval(self.interval):
await self._schedule_jobs()
if (
not self._collecting
and not self.job_queue
and self._processing.levels.jobs == 0
):
break
@staticmethod
def _match_job(
job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[Drone]]]]
):
"""
Tries to find a match for the transferred job among the available drones.
:param job: job to match
:param pre_job_clusters: list of clusters of wrapped drones that are
presorted by a clustering mechanism of RankedAutoClusters/RankedNonClusters
that mimics the HTCondor NEGOTIATOR_PRE_JOB_RANK, short prejobrank. The
clusters contain drones that are considered to be equivalent with respect to all
Requirements and Ranks
that are used during the matchmaking process. This mimics the Autoclustering
functionality of HTCondor.
[[highest prejobrank {autocluster}, {autocluster}], ..., [lowest prejobrank {
autocluster}, {autocluster}]
:return: drone that is the best match for the job
The matching is performed in several steps:
1. The job's requirements are evaluted and only drones that meet them are
considered further. A drone of every autocluster is extracted from
pre_job_clusters and if it meets the job's requirements it is not removed
from pre_job_clusters.
2. The autoclusters that are equivalent with respect to the prejobrank are
then sorted by the job's rank expression. The resulting format of
pre_job_clusters is
[[(highest prejobrank, highest jobrank) {autocluster} {autocluster},
..., (highest prejobrank, lowest jobrank) {autocluster}], ...]
3. The resulting pre_job_clusters are then iterated and the drone with the
highest (prejobrank, jobrank) whose requirements are also compatible with the
job is returned as best match.
"""
def debug_evaluate(expr, my, target=None):
"""
Reimplementation of the classad packages evaluate function. Having it
here enables developers to inspect the ClassAd evaluation process more
closely and to add debug output if necessary.
:param expr:
:param my:
:param target:
:return:
"""
if type(expr) is str:
expr = my[expr]
result = expr.evaluate(my=my, target=target)
return result
if job["Requirements"] != Undefined():
pre_job_clusters_tmp = []
for cluster_group in pre_job_clusters:
cluster_group_tmp = []
for cluster in cluster_group:
if debug_evaluate(
"Requirements", my=job, target=next(iter(cluster))
):
cluster_group_tmp.append(cluster)
pre_job_clusters_tmp.append(cluster_group_tmp)
pre_job_clusters = pre_job_clusters_tmp
if job["Rank"] != Undefined():
pre_job_clusters_tmp = []
for cluster_group in pre_job_clusters:
pre_job_clusters_tmp.append(
sorted(
cluster_group,
key=lambda cluster: (
debug_evaluate("Rank", my=job, target=next(iter(cluster))),
random.random(),
),
reverse=True,
)
)
pre_job_clusters = pre_job_clusters_tmp
for cluster_group in pre_job_clusters:
# TODO: if we have POST_JOB_RANK, collect *all* matches of a group
for cluster in cluster_group:
for drone in cluster:
if drone["Requirements"] == Undefined() or drone.evaluate(
"Requirements", my=drone, target=job
):
return drone
raise NoMatch()
async def _schedule_jobs(self):
"""
Handles the scheduling of jobs. Tried to match the jobs in the job queue to
available resources. This occurs in several steps.
1. The list of drones known to the scheduler is copied. The copy can then be
used to keep track of the drones' available resources while matching jobs as
the jobs allocate resources on the original drones before being processed but
not during scheduling.
2. The job in the job queue are matched to (the copied)resources iteratively.
The actual matching is performed by the `_match_job` method that returns the
most suitable drone unless no drone is compatible with the job's requirements.
If a match was found, the resources requested by the job are allocated on the
matched drone. If no resources remain unallocated after the last job's
allocation, the matching process is ended for this scheduler interval.
3. After the job matching is finished, the matched jobs are removed from the
job queue as the index of a job in the job queue changes once a job with a
lower index is removed from the queue.
4. The matched jobs' execution is triggered.
"""
# Pre CachingJob Rank is the same for all jobs
# Use a copy to allow temporary "remainder after match" estimates
if self._drones.empty():
return
pre_job_drones = self._drones.copy()
matches: List[
Tuple[int, WrappedClassAd[CachingJob], WrappedClassAd[Drone]]
] = []
for queue_index, candidate_job in enumerate(self.job_queue):
try:
pre_job_drones.lookup(candidate_job._wrapped)
matched_drone = self._match_job(
candidate_job, pre_job_drones.cluster_groups()
)
except NoMatch:
candidate_job._wrapped.failed_matches += 1
continue
else:
matches.append((queue_index, candidate_job, matched_drone))
for key, value in candidate_job._wrapped.resources.items():
matched_drone._temp[key] = (
matched_drone._temp.get(
key,
matched_drone._wrapped.theoretical_available_resources[key],
)
- value
)
pre_job_drones.update(matched_drone)
# monitoring/coordination stuff
if (
candidate_job._wrapped._total_input_data
and matched_drone._wrapped.cached_data
):
candidate_job._wrapped._cached_data = (
matched_drone._wrapped.cached_data
)
if pre_job_drones.empty():
break
if not matches:
return
# TODO: optimize for few matches, many matches, all matches
for queue_index, _, _ in reversed(matches):
del self.job_queue[queue_index]
for _, job, drone in matches:
drone.clear_temporary_resources()
await self._execute_job(job=job, drone=drone)
await sampling_required.put(self)
# NOTE: Is this correct? Triggers once instead of for each job
await sampling_required.put(self.job_queue)
await sampling_required.put(UserDemand(len(self.job_queue)))
async def _execute_job(self, job: WrappedClassAd, drone: WrappedClassAd):
"""
Schedules a job on a drone by extracting both objects from the
respective WrappedClassAd and using the drone's scheduling functionality
:param job:
:param drone:
"""
wrapped_job = job._wrapped
wrapped_drone = drone._wrapped
await wrapped_drone.schedule_job(wrapped_job)
async def _collect_jobs(self):
"""
Combines jobs that are imported from the simulation's job config with a job
ClassAd and adds the resulting WrappedClassAd objects to the scheduler's job
queue.
"""
async for job in self._stream_queue:
wrapped_job = WrappedClassAd(classad=self._job_classad, wrapped=job)
self._wrapped_classads[job] = wrapped_job
self.job_queue.append(wrapped_job)
await self._processing.increase(jobs=1)
# TODO: logging happens with each job
# TODO: job queue to the outside now contains wrapped classads...
await sampling_required.put(self.job_queue)
await sampling_required.put(UserDemand(len(self.job_queue)))
self._collecting = False
async def job_finished(self, job):
"""
Handles the impact of finishing jobs on the scheduler. If the job is completed
successfully, the amount of running jobs matched by the current scheduler
instance is reduced. If the job is not finished successfully,
it is resubmitted to the scheduler's job queue.
:param job:
"""
if job.successful:
await self._processing.decrease(jobs=1)
else:
self.job_queue.append(self._wrapped_classads[job])
does not consider the PRE_JOB_RANK in the negotiation process. Currently a PRE_JOB_RANK classad that works in the context of Drones instead of CachingJobs is assumed, as this improves the runtime of the scheduling process.

Comparable to the implementation of RANK, the classad mechanism for the PRE_JOB_RANK might also be used in the context of all available components including CachingJobs, instead of the current drone ranking:

self._drones: RankedClusters[Drone] = RankedNonClusters(
quantization=quantization_defaults, ranking=parse(pre_job_rank)
)

As this will heavily impact the performance of simulation this might be done in a separate class. A renaming of the current CondorClassAdScheduler might be reasonable to make clear, that it does not implement the whole functionality of the original HTCondor Scheduler.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions