Conversation
DanielElisenberg
left a comment
There was a problem hiding this comment.
I really like the state manager class 👍🏻 Easy to read solution to our issue. Good stuff 💯
job_executor/app.py
Outdated
| if job and job.status not in ["queued", "built"]: | ||
| logger.info(f"Worker died and did not finish job {job.job_id}") | ||
| fix_interrupted_job(job) | ||
| manager_state.unregister_job(job.job_id) |
There was a problem hiding this comment.
job would be none if it is completed here. I think we should refactor this solution to iterate through each dead worker and use the dead worker first and foremost. Something akin to :
def clean_up_after_dead_workers(
dead_workers: List[Worker], manager_state
) -> None:
if len(dead_workers) == 0:
return
for dead_worker in dead_workers:
# only query for the job in question
# the amount of requests here are maximum 4, so of no consequence
job = job_service.get_job(dead_worker.job_id)
if job and job.status not in ["queued", "built"]:
logger.info(f"Worker died and did not finish job {job.job_id}")
fix_interrupted_job(job)
manager_state.unregister_job(dead_worker.job_id)
job_executor/app.py
Outdated
| logger.info( | ||
| f"{job.job_id} Failed to get the size of the dataset." | ||
| ) | ||
| raise LocalStorageError( |
There was a problem hiding this comment.
If we do it like this the local storage module itself might as well raise the error so we don't need to do the check here.
Another option would be to fail the job:
job_service.update_job_status(
job_id, "failed", log=f"No such dataset available for import"
)This is preferable to crashing the whole executor if one dataset should be tampered with in some unexpected way.
rename get gize got get_input_tar_size_in_bytes fix unregister for dead_workers Fails job instead of crashing if size isnt found Tok docs-strings for a run and a diet
job_executor/worker/manager_state.py
Outdated
| else: | ||
| self.current_max_workers = self.default_max_workers | ||
| can_spawn = True | ||
| self.update_worker_limit(new_job_size) |
There was a problem hiding this comment.
Nice to refactor this, but I think the can_spawn_new_worker should be purely "get" method without extra "setting" things, so refactoring this out will improve readability
Register job handles update current worker limit
|
Just food for thought: Could it be beneficial to also keep the active workers list inside the ManagerState? That is also part of the statefulness of the manager process so it might be more readable for as much of it as possible to exist in this new state class? |
Co-authored-by: pawbu <pawbu@users.noreply.github.com>
Co-authored-by: Daniel Elisenberg <33904479+DanielElisenberg@users.noreply.github.com>
Co-authored-by: Daniel Elisenberg <33904479+DanielElisenberg@users.noreply.github.com>
|
DanielElisenberg
left a comment
There was a problem hiding this comment.
Let's send it to integration tests and QA 💯



Uses a class to store state about workers.
The class stores:
How it works:
2.1
If alive workers >= default_max_workers and current_total_size + new_job_size >= max size in pipeline2.2 we allow new job!
Note:
This can cause large jobs to be prioritized later than small jobs:
e.g.
default_max_workers=4
max_gb_all_workers=100GB
JOB_1=20GB -->
len(workers) is 0 && current_size is 0GB--> Can spawn!JOB_2=90GB -->
len(workers) is 1 && current_size is 20GB--> Not allowed!JOB_3=20GB -->
len(workers) is 1 && current_size is 20GB--> Can spawn!JOB_4=20GB -->
len(workers) is 2 && current_size is 40GB--> Can spawn!If we start these 4 jobs. Job_2 won't run until the others are done. (exect if it get picked up first, then all the other jobs will wait.)