Skip to content

Conversation

@khsrali
Copy link
Contributor

@khsrali khsrali commented Oct 10, 2025

Edit:

Problem

The JobsList class had a race condition where:

  1. Job A requests a status update, triggering a scheduler query
  2. While the scheduler is being queried (with only Job A), Job B also requests an update
  3. After the scheduler returns (with only Job A's status), both futures were resolved
  4. Job B's future was resolved as DONE, because AiiDA assumes any job ID that disappeared from the scheduler query has completed

This premature "DONE" status causes several critical issues:

  • Premature retrieval: AiiDA attempts to retrieve output files while the job is still running
  • Corrupted files: Files may be incomplete or still being written when retrieved
  • False failure reports: Jobs still running may be incorrectly marked as failed

This issue only surfaces when using async transport plugins like core.ssh_async or aiida-firecrest, where the timing conditions make the race condition more likely to occur.

Solution

Only resolve futures for jobs that were actually inspected by the scheduler

Testing

Added test_prevent_racing_condition which explicitly tests the race condition scenario

@codecov
Copy link

codecov bot commented Oct 10, 2025

Codecov Report

❌ Patch coverage is 76.92308% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.59%. Comparing base (cd11f08) to head (30d0039).

Files with missing lines Patch % Lines
src/aiida/engine/processes/calcjobs/manager.py 76.93% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #7061      +/-   ##
==========================================
+ Coverage   79.58%   79.59%   +0.02%     
==========================================
  Files         566      566              
  Lines       43517    43519       +2     
==========================================
+ Hits        34629    34636       +7     
+ Misses       8888     8883       -5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rikigigi
Copy link
Member

Do the slurm project know about this issue?

@mbercx
Copy link
Member

mbercx commented Oct 14, 2025

Thanks @khsrali! Just testing your solution in the field now:

  • First, I managed to reproduce the issue on Thanos, see calculation job with PK 24690 (project mc3d-relax; profile dev). This happened quite quickly, like the 5th pw.x calculation I started.
  • Using your branch, I ran ~60 calculations with the same setup. None of them had the issue.

So it seems to fix the issue robustly for me. Regarding configuration, my main question is: does await asyncio.sleep(5) block one of the workers?

@khsrali
Copy link
Contributor Author

khsrali commented Oct 14, 2025

Thanks @mbercx for trying this out,

does await asyncio.sleep(5) block one of the workers?

Not at all, it's placed before EBM call, and only on individual calls.
So scheduler.get_job is happening as before, only the current calcjob will delay to receive an update. And that delay is placed on only and only the first update of that calcjob. So the overhead is very small, and non-blocking.

I'd like to come up with some sorta bullet proof regression tests for this solution, to make 100% sure if this resolve the issue.

@rikigigi

Do the slurm project know about this issue?

I couldn't find any similar reported issue online. Before any reporting, I'd like to make 100% sure if the lag in slurm is the issue.

@mbercx
Copy link
Member

mbercx commented Oct 14, 2025

Great, thanks for the clarification @khsrali! In that case perhaps we can just set this sleep time to safe_interval? Seems appropriate, and we avoid adding another option.

Fully support adding a test for this. From my end: I'll run ~1000 structures with core.ssh_async in production, come back to you in case I still run into this premature retrieval problem.

logger.info(f'scheduled request to update CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)

if not node.get_last_job_info():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using a while loop to fetch the job info with a time interval and timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while loop with timeout is an antipattern to dealing with remote communication. The transport already use transport.exec_command_wait to wait for the command to finish. The while loop is a tight loop that block the event loop and can cause performance downgrade. Meanwhile the error handling is hard to be standardlized inside. We don't want shot our foot with such workaround.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the code waits a fixed 5 seconds (await asyncio.sleep(5)), even if the job info becomes available earlier. A better approach is to poll periodically with a timeout, like this:

interval = 1
timeout = 5
tstart = time.time()

while not node.get_last_job_info():
    if time.time() - tstart > timeout:
        break
    await asyncio.sleep(interval)

This approach still uses await, so it doesn’t block the event loop. It just allows earlier exit when the job info is ready, instead of always waiting the full 5 seconds.

I don't really understand your concern about this approach. It’s quite standard in async workflows where you want to wait for a condition to become true without blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this will not work, because node.get_last_job_info() is only updated in func::do_update.

Copy link
Member

@superstar54 superstar54 Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This becomes more complex. In this case, I would not sleep before the do_update, instead, we can handle it inside the do_udpate function.
We can use a flag has_job_info,

If job_info is None:
    if not `already_had_job_info`  and t < timeout:
    # which means the job_info is not appear because of slurm , so we should wait instead of assuming it's done
    else:
    # do the normal code
else:
    already_had_job_info = True # record that we already see the job.
    ...

@rikigigi
Copy link
Member

This workaround has completely stabilized the aiida interactions with the Slurm cluster I'm using, thank you! While the fix itself is brilliant, it's frankly concerning that Slurm has an underlying issue that requires such an unconventional solution....

@unkcpz
Copy link
Member

unkcpz commented Oct 17, 2025

Did you check the stderr of squeue command call? In the code base, the stderr was ignored. I believe if the command is timeout or didn't return the expected output, the slurm will give some reasonable information.

You may want to check this function:

    def get_jobs(
        self,
        jobs: list[str] | None = None,
        user: str | None = None,
        as_dict: bool = False,
    ) -> list[JobInfo] | dict[str, JobInfo]:
        """Return the list of currently active jobs.

        :param jobs: A list of jobs to check; only these are checked.
        :param user: A string with a user: only jobs of this user are checked.
        :param as_dict: If ``False`` (default), a list of ``JobInfo`` objects is returned. If ``True``, a dictionary is
            returned, where the ``job_id`` is the key and the values are the ``JobInfo`` objects.
        :returns: List of active jobs.
        """
        with self.transport:
            retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user))

        joblist = self._parse_joblist_output(retval, stdout, stderr)
        if as_dict:
            jobdict = {job.job_id: job for job in joblist}
            if None in jobdict:
                raise SchedulerError('Found at least one job without jobid')
            return jobdict

        return joblist

@rikigigi @mbercx, can you check your daemon log to see if you can see any following error/warning messages?

                f"""squeue returned exit code {retval} (_parse_joblist_output function)
stdout='{stdout.strip()}'
stderr='{stderr.strip()}'"""
            )
        if stderr.strip():
            self.logger.warning(
                f"squeue returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'"
            )

@khsrali
Copy link
Contributor Author

khsrali commented Oct 17, 2025

@unkcpz and @superstar54

I think there's a misunderstanding here, the issue is not that the job is not submitted or we don't have a job-id.
What we think is the issue is that the update function concluding the job is complete because it cannot find it in the squeue results after the submission.

@superstar54
Copy link
Member

I think there's a misunderstanding here, the issue is not that the job is not submitted or we don't have a job-id.
What we think is the issue is that the update function concluding the job is complete because it cannot find it in the squeue results after the submission.

Yes, no confusion on my side. My comment was only about replacing the hard-coded wait time with a more flexible approach.

@khsrali
Copy link
Contributor Author

khsrali commented Oct 17, 2025

#4326
🤔

@khsrali khsrali force-pushed the scheduler_prejudice_bug branch from d146b8b to 9f8ddd1 Compare November 3, 2025 15:27
@khsrali khsrali changed the title Fix premature retrieval of output files before job completion in async engine Fix race condition in JobsList Nov 3, 2025
@khsrali khsrali requested review from mbercx and removed request for mbercx November 3, 2025 15:56
@mbercx
Copy link
Member

mbercx commented Nov 3, 2025

Thanks for the ping @khsrali! I see you've found a more robust/elegant solution? I'll do a bit of field testing to see if I run into any issues. 🚀

@khsrali
Copy link
Contributor Author

khsrali commented Nov 4, 2025

@mbercx yes! this hopefully is the correct fix.
In the end the problem was not slurm, it appeared to be a racing condition in the JobManger.

@khsrali khsrali force-pushed the scheduler_prejudice_bug branch from 9f8ddd1 to 06a6b17 Compare November 4, 2025 08:54
@danielhollas danielhollas self-requested a review November 4, 2025 12:59
Comment on lines 152 to 155
if str(job_id) in self._inspecting_jobs:
future.set_result(self._jobs_cache.get(job_id, None))
else:
racing_requests[job_id] = future
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you first check whether the self._jobs_cache.get resturns a non-None result? If so, then presumably you can set the future even if it has been racing? Or is it guaranteed to be None if the job was not in self._inspecting_jobs? But I might be misunderstanding.

Something like this:

Suggested change
if str(job_id) in self._inspecting_jobs:
future.set_result(self._jobs_cache.get(job_id, None))
else:
racing_requests[job_id] = future
job_status = self._jobs_cache.get(job_id, None)
if str(job_id) in self._inspecting_jobs or job_status is not None:
future.set_result(self._jobs_cache.get(job_id, None))
else:
racing_requests[job_id] = future

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looking at the code in _get_jobs_from_scheduler it seems like it will always be None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the question

racing_requests[job_id] = future
finally:
self._job_update_requests = {}
self._job_update_requests = racing_requests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is still a race condition here since you can overwrite a poorly timed call to self._job_update_requests.setdefault in request_job_info_update.

I think instead of this finally clause we should be poping the dict items as we iterate on them above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a better pop solution, just to be safe.

P.S. I don't think if this ever happens, because the for loop is actually blocking the loop (although it's inside an async function), that means during the process exactly between
self._jobs_cache = await self._get_jobs_from_scheduler() and the end of the function, self._job_update_requests is really "frozen".
It's only when it reaches the end of the function, that it relinquishes resources to the loop. (there's nothing async in the function apart the await that I mentioned)

@mbercx
Copy link
Member

mbercx commented Nov 5, 2025

I've been doing some more field testing of this version of the PR:

  1. Submitting ~50 more calculations on Eiger, I find no more "early retrieval" issues.
  2. Running (i.e. with engine.run) a test workflow for a project here I am encountering a few "issues". None of these break the workflow, but quick a few error tracebacks/warnings are reported that detract from the UX. Some I encounter for both core.ssh and core.ssh_async, see 🐛 FileNotFoundError related to monitors #7086. However, one issue (see below) I am only encountering for core.ssh_async, and a cursory glance reveals it could be related to race conditions, so I'm reporting it here.
Transport already closed issue
Traceback (most recent call last):
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/utils.py", line 205, in exponential_backoff_retry
    result = await coro()
             ^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/tasks.py", line 196, in do_update
    job_info = await cancellable.with_interrupt(update_request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/utils.py", line 115, in with_interrupt
    result = await next(wait_iter)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/tasks.py", line 631, in _wait_for_one
    return f.result()  # May raise f.exception().
           ^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/futures.py", line 202, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/manager.py", line 135, in _update_job_info
    self._jobs_cache = await self._get_jobs_from_scheduler()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/processes/calcjobs/manager.py", line 97, in _get_jobs_from_scheduler
    with self._transport_queue.request_transport(self._authinfo) as request:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 144, in __exit__
    next(self.gen)
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/transports.py", line 122, in request_transport
    transport_request.future.result().close()
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/transport.py", line 1838, in close
    return self.run_command_blocking(self.close_async)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/transport.py", line 1832, in run_command_blocking
    return loop.run_until_complete(func(*args, **kwargs))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/engine/runners.py", line 160, in run_until_complete
    return self._loop.run_until_complete(future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mbercx/.aiida_venvs/defect/lib/python3.12/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/futures.py", line 202, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/opt/homebrew/Cellar/[email protected]/3.12.12/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/Users/mbercx/project/defect/git/aiida-core/src/aiida/transports/plugins/ssh_async.py", line 190, in close_async
    raise InvalidOperation('Cannot close the transport: it is already closed')
aiida.common.exceptions.InvalidOperation: Cannot close the transport: it is already closed

@khsrali
Copy link
Contributor Author

khsrali commented Nov 5, 2025

Thanks a lot @mbercx , for confirmation!

I was not aware of #7086, but that also seem to be a separate bug. Thanks for reporting.

I'm tracing back "Transport already closed issue", but that certainly has other cause and I'm resolving that as a separate bug.

Now I'm confident that this PR reasonably resolves the racing issue we had in mind. In addition, the test that I wrote simulates that scenario, which would clearly fail before this fix.

@khsrali
Copy link
Contributor Author

khsrali commented Nov 5, 2025

@unkcpz
Yes, it's exactly here self._jobs_cache = await self._get_jobs_from_scheduler().
I thought overall it's better to fix the racing logic in the code, but of course one could go also for asyncio.Lock.

asyncio.Lock might slightly slow down the process, as the pending requests have to hang in there. I don't have an estimate by how much, though.

I don't have strong opinions, but as it's now, the racing logic is prevented.

for future in self._job_update_requests.values():
if not future.done():
future.set_exception(exception)
for job_id in self._polling_jobs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @khsrali for implementing my suggestion, I think the code is now clearer and less error-prone.

I still think we need to ponder one thing: Is it safe to iterate over self._polling_jobs? I don't think they necessarily need to be in sync with _job_update_requests. I suspect a safer solution is to still iterate over _job_update_requests as before, and check whether job_id is in self._polling_jobs before poping out the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @danielhollas,
Thanks for your suggestion. self._polling_jobs is indeed a subset of _job_update_requests, and not in sync with it. The two solution are equivalent, I don't see why this should not work.

@khsrali
Copy link
Contributor Author

khsrali commented Nov 9, 2025

I'm on holidays, I can only get back to this in two weeks.
I suggest to skip nitpick suggestions, and stick with what's absolutely necessarily.
This is an ongoing bug.

@GeigerJ2 GeigerJ2 moved this to In progress in aiida-core v2.7.2 Nov 10, 2025
@GeigerJ2 GeigerJ2 moved this from In progress to In review in aiida-core v2.7.2 Nov 11, 2025
@mbercx
Copy link
Member

mbercx commented Nov 20, 2025

Just a note while running more in the field: I seem to have to restart my daemon a lot to keep processes going. I.e. I check the Slurm queue on the cluster, but I see no job. But my process still seems to be running according to AiiDA. I have to still investigate a bit more the cause, and see if I can consistently reproduce it.

EDIT: good and bad news: I'm also running into the same issues with core.ssh. Going to switch to main and continue running with core.ssh. If I still encounter the problem I'll open a separate issue.

EDIT2: more good and bad: switching back to main, the stability is perfect again. Just to be absolutely sure, I switched back to this branch, and am facing the same issue as before with a lot of jobs (see description below). So it's definitely related to the changes here, and we should figure out what is going wrong, because the engine is very fragile in this state. Not sure why I didn't see the issue in my previous tests (maybe I forgot the classic: restart the daemon after updating the branch ^^). I have to keep running now, but @khsrali let me know when you're back and hopefully I have more time to help investigate.


  1. I see a bunch of jobs that are in job state UNKNOWN or RUNNING for some time:

    199644  16h ago    PwCalculation          ⏵ Waiting        Monitoring scheduler: job state UNKNOWN
    199661  16h ago    PwCalculation          ⏵ Waiting        Monitoring scheduler: job state UNKNOWN
    199676  15h ago    PwCalculation          ⏵ Waiting        Monitoring scheduler: job state RUNNING
    
  2. I check on the remote: the jobs are no longer active.

  3. I stop the daemon, which was still running fine.

  4. The transport tasks are now cancelled.

  5. I try to repair the processes, but there are no inconsistencies.

  6. I start the daemon, and the tasks are picked up again.

@danielhollas
Copy link
Collaborator

@mbercx I have a strong suspicion that the current implementation is wrong in this PR, see my comments above.

@khsrali
Copy link
Contributor Author

khsrali commented Nov 24, 2025

@mbercx, do you have any log, error message, etc ? or all these happens silently ?

@khsrali
Copy link
Contributor Author

khsrali commented Nov 24, 2025

@danielhollas, thanks for reviewing this. I responded to your comments, please let me know what you think.

@khsrali
Copy link
Contributor Author

khsrali commented Nov 25, 2025

@mbercx I have a hard to reproduce this. Without having a test case I cannot progress more on this.
Can you please set your log level to debug and report here?

@mbercx
Copy link
Member

mbercx commented Nov 26, 2025

Apologies @khsrali, I've been focussing on the project here in Aus the past days. Unfortunately, the project is now being used for production runs. I've done you one better though: I set up an ali-playground project on Thanos - duplicating most of my setup - that you can (hopefully) use to reproduce the issues I've been having. I'll write you on Slack with more details.

EDIT: This is probably relevant:

Task exception was never retrieved
future: <Task finished name='Task-3342' coro=<JobsList._ensure_updating.<locals>.updating() done, defined at /home/aiida/project/ali-playground/git-repos/aiida-core/src/aiida/e
ngine/processes/calcjobs/manager.py:186> exception=KeyError('5989541')>
Traceback (most recent call last):
  File "/usr/lib/python3.10/asyncio/tasks.py", line 232, in __step
    result = coro.send(None)
  File "/home/aiida/project/ali-playground/git-repos/aiida-core/src/aiida/engine/processes/calcjobs/manager.py", line 188, in updating
    await self._update_job_info()
  File "/home/aiida/project/ali-playground/git-repos/aiida-core/src/aiida/engine/processes/calcjobs/manager.py", line 155, in _update_job_info
    future = self._job_update_requests.pop(job_id)
KeyError: '5989541'

See /home/aiida/project/ali-playground/.aiida/daemon/log/aiida-dev.log.

@khsrali khsrali force-pushed the scheduler_prejudice_bug branch from 65e501c to 6e2bd0e Compare November 26, 2025 14:39
@khsrali
Copy link
Contributor Author

khsrali commented Nov 26, 2025

@danielhollas & @mbercx
Yup! it's the typing issue!
Oh python python python, what can I say...

@unkcpz
Copy link
Member

unkcpz commented Nov 26, 2025

had same issue before when working with hyperqueue #6542, I was trying to fix it at #6543, but since the part was very confusing, I gave up.

@mbercx
Copy link
Member

mbercx commented Nov 27, 2025

Great! I'll switch back to this branch locally, and see what else I can break. 😇 Feel free to go ahead and merge whenever you're ready, and I'll keep running on main.

@khsrali khsrali linked an issue Nov 27, 2025 that may be closed by this pull request
@khsrali khsrali requested a review from danielhollas November 27, 2025 13:25
Copy link
Collaborator

@danielhollas danielhollas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khsrali @mbercx amazing work, thanks!

I have some super minor final suggestions (and one less minor). But the bigger "problem" here is that the manager.py file is currently not type checked, and when I run mypy on it I see errors. I think it would be good to fix and enable type checking as part of this PR. @khsrali if you agree I can push a commit with typing fixes if you want.

❯ mypy --pretty src/aiida/engine/processes/calcjobs/manager.py
src/aiida/engine/processes/calcjobs/manager.py:119: error: Item "list[JobInfo]" of "list[JobInfo] | dict[str, JobInfo]" has no attribute
"items"  [union-attr]
                for job_id, job_info in scheduler_response.items():
                                        ^~~~~~~~~~~~~~~~~~~~~~~~
src/aiida/engine/processes/calcjobs/manager.py:195: error: Unused "type: ignore" comment  [unused-ignore]
                        context=contextvars.Context(),  #  type: ignore[call-arg]
                        ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
src/aiida/engine/processes/calcjobs/manager.py:206: error: Unused "type: ignore" comment  [unused-ignore]
                    context=contextvars.Context(),  #  type: ignore[call-arg]
                    ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
src/aiida/engine/processes/calcjobs/manager.py:272: error: Incompatible types in assignment (expression has type "JobsList", target has
type "JobInfo")  [assignment]
                self._job_lists[authinfo.pk] = JobsList(authinfo, self._transport_queue)
                                               ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
src/aiida/engine/processes/calcjobs/manager.py:274: error: Incompatible return value type (got "JobInfo", expected "JobsList") 
[return-value]
            return self._job_lists[authinfo.pk]
                   ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
Found 5 errors in 1 file (checked 1 source file)

return self._last_updated

async def _get_jobs_from_scheduler(self) -> Dict[Hashable, 'JobInfo']:
async def _get_jobs_from_scheduler(self) -> Dict[str, 'JobInfo']:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional, since you're already touching the type:

Suggested change
async def _get_jobs_from_scheduler(self) -> Dict[str, 'JobInfo']:
async def _get_jobs_from_scheduler(self) -> dict[str, 'JobInfo']:

self._job_update_requests: Dict[str, asyncio.Future] = {} # Mapping: {job_id: Future}
self._last_updated = last_updated
self._update_handle: Optional[asyncio.TimerHandle] = None
self._polling_jobs: List[str] = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the list builtin

Suggested change
self._polling_jobs: List[str] = []
self._polling_jobs: list[str] = []

Comment on lines +60 to +61
self._jobs_cache: Dict[str, 'JobInfo'] = {}
self._job_update_requests: Dict[str, asyncio.Future] = {} # Mapping: {job_id: Future}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional:

Suggested change
self._jobs_cache: Dict[str, 'JobInfo'] = {}
self._job_update_requests: Dict[str, asyncio.Future] = {} # Mapping: {job_id: Future}
self._jobs_cache: dict[str, 'JobInfo'] = {}
self._job_update_requests: dict[str, asyncio.Future] = {} # Mapping: {job_id: Future}

scheduler = self._authinfo.computer.get_scheduler()
scheduler.set_transport(transport)

self._polling_jobs = [str(job_id) for job_id, _ in self._job_update_requests.items()]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be slightly simpler

Suggested change
self._polling_jobs = [str(job_id) for job_id, _ in self._job_update_requests.items()]
self._polling_jobs = [str(job_id) for job_id in self._job_update_requests]

Comment on lines +155 to +156
future = self._job_update_requests.pop(job_id)
if future.done():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite the discussion below, I'd still do this a bit more defensively

Suggested change
future = self._job_update_requests.pop(job_id)
if future.done():
future = self._job_update_requests.pop(job_id, None)
if future is None or future.done():

@danielhollas
Copy link
Collaborator

Here's a patch that fixes the typing issues. I can submit it as a follow up, up to you @khsrali

diff --git a/src/aiida/engine/processes/calcjobs/manager.py b/src/aiida/engine/processes/calcjobs/manager.py
index 960b5f0c4..7c9120677 100644
--- a/src/aiida/engine/processes/calcjobs/manager.py
+++ b/src/aiida/engine/processes/calcjobs/manager.py
@@ -7,6 +7,7 @@
 # For further information please visit http://www.aiida.net               #
 ###########################################################################
 """Module containing utilities and classes relating to job calculations running on systems that require transport."""
+from __future__ import annotations
 
 import asyncio
 import contextlib
@@ -116,7 +117,7 @@ class JobsList:
             jobs_cache = {}
             self.logger.info(f'AuthInfo<{self._authinfo.pk}>: successfully retrieved status of active jobs')
 
-            for job_id, job_info in scheduler_response.items():
+            for job_id, job_info in scheduler_response.items():  # type: ignore[union-attr]
                 jobs_cache[job_id] = job_info
 
             return jobs_cache
@@ -192,7 +193,7 @@ class JobsList:
                     self._get_next_update_delay(),
                     asyncio.ensure_future,
                     updating(),
-                    context=contextvars.Context(),  #  type: ignore[call-arg]
+                    context=contextvars.Context(),
                 )
             else:
                 self._update_handle = None
@@ -203,7 +204,7 @@ class JobsList:
                 self._get_next_update_delay(),
                 asyncio.ensure_future,
                 updating(),
-                context=contextvars.Context(),  #  type: ignore[call-arg]
+                context=contextvars.Context(),
             )
 
     @staticmethod
@@ -260,7 +261,7 @@ class JobManager:
 
     def __init__(self, transport_queue: 'TransportQueue') -> None:
         self._transport_queue = transport_queue
-        self._job_lists: Dict[Hashable, 'JobInfo'] = {}
+        self._job_lists: dict[int | None, JobsList] = {}
 
     def get_jobs_list(self, authinfo: AuthInfo) -> JobsList:
         """Get or create a new `JobLists` instance for the given authinfo.

@danielhollas
Copy link
Collaborator

I've opened #7114 with the typing fixes, we can merge it after this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

🐛 Async Engine: retrieval of files before the job is completed? job_id partially cast in to str cause wrong job state

6 participants