Skip to content

fix: cancel scheduler tasks on shutdown #2130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

booxter
Copy link
Collaborator

@booxter booxter commented May 9, 2025

What does this PR do?

Scheduler: cancel tasks on shutdown.

Otherwise the currently running tasks will never exit (before they
actually complete), which means the process can't be properly shut down
(only with SIGKILL).

Ideally, we let tasks know that they are about to shutdown and give them
some time to do so; but in the lack of the mechanism, it's better to
cancel than linger forever.

Test Plan

Start a long running task (e.g. torchtune or external kfp-provider training).
Ctr-C the process in TTY. Confirm it exits in reasonable time.

^CINFO:     Shutting down
INFO:     Waiting for application shutdown.
13:32:26.187 - INFO - Shutting down
13:32:26.187 - INFO - Shutting down DatasetsRoutingTable
13:32:26.187 - INFO - Shutting down DatasetIORouter
13:32:26.187 - INFO - Shutting down TorchtuneKFPPostTrainingImpl
    Traceback (most recent call last):
      File "/opt/homebrew/Cellar/[email protected]/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/runners.py", line 118, in run
        return self._loop.run_until_complete(task)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/opt/homebrew/Cellar/[email protected]/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
        return future.result()
               ^^^^^^^^^^^^^^^
    asyncio.exceptions.CancelledError

    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "<frozen runpy>", line 198, in _run_module_as_main
      File "<frozen runpy>", line 88, in _run_code
      File "/Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/dsl/executor_main.py", line 109, in <module>
        executor_main()
      File "/Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/dsl/executor_main.py", line 101, in executor_main
        output_file = executor.execute()
                      ^^^^^^^^^^^^^^^^^^
      File "/Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/dsl/executor.py", line 361, in execute
        result = self.func(**func_kwargs)
                 ^^^^^^^^^^^^^^^^^^^^^^^^
      File "/var/folders/45/1q1rx6cn7jbcn2ty852w0g_r0000gn/T/tmp.RKpPrvTWDD/ephemeral_component.py", line 118, in component
        asyncio.run(recipe.setup())
      File "/opt/homebrew/Cellar/[email protected]/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/runners.py", line 194, in run
        return runner.run(main)
               ^^^^^^^^^^^^^^^^
      File "/opt/homebrew/Cellar/[email protected]/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/asyncio/runners.py", line 123, in run
        raise KeyboardInterrupt()
    KeyboardInterrupt


13:32:31.219 - ERROR - Task 'component' finished with status FAILURE
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
INFO     2025-05-09 13:32:31,221 llama_stack.providers.utils.scheduler:221 scheduler: Job
         test-jobc3c2e1e4-859c-4852-a41d-ef29e55e3efa: Pipeline [1m[95m'test-jobc3c2e1e4-859c-4852-a41d-ef29e55e3efa'[1m[0m
         finished with status [1m[91mFAILURE[1m[0m. Inner task failed: [1m[96m'component'[1m[0m.
ERROR    2025-05-09 13:32:31,223 llama_stack_provider_kfp_trainer.scheduler:54 scheduler: Job
         test-jobc3c2e1e4-859c-4852-a41d-ef29e55e3efa failed.
         ╭───────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────╮
         │ /Users/ihrachys/src/llama-stack-provider-kfp-trainer/src/llama_stack_provider_kfp_trainer/scheduler.py:45   │
         │ in do                                                                                                       │
         │                                                                                                             │
         │    42 │   │   │                                                                                             │
         │    43 │   │   │   job.status = JobStatus.running                                                            │
         │    44 │   │   │   try:                                                                                      │
         │ ❱  45 │   │   │   │   artifacts = self._to_artifacts(job.handler().output)                                  │
         │    46 │   │   │   │   for artifact in artifacts:                                                            │
         │    47 │   │   │   │   │   on_artifact_collected_cb(artifact)                                                │
         │    48                                                                                                       │
         │                                                                                                             │
         │ /Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/dsl/base_compon │
         │ ent.py:101 in __call__                                                                                      │
         │                                                                                                             │
         │    98 │   │   │   │   f'{self.name}() missing {len(missing_arguments)} required '                           │
         │    99 │   │   │   │   f'{argument_or_arguments}: {arguments}.')                                             │
         │   100 │   │                                                                                                 │
         │ ❱ 101 │   │   return pipeline_task.PipelineTask(                                                            │
         │   102 │   │   │   component_spec=self.component_spec,                                                       │
         │   103 │   │   │   args=task_inputs,                                                                         │
         │   104 │   │   │   execute_locally=pipeline_context.Pipeline.get_default_pipeline() is                       │
         │                                                                                                             │
         │ /Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/dsl/pipeline_ta │
         │ sk.py:187 in __init__                                                                                       │
         │                                                                                                             │
         │   184 │   │   ])                                                                                            │
         │   185 │   │                                                                                                 │
         │   186 │   │   if execute_locally:                                                                           │
         │ ❱ 187 │   │   │   self._execute_locally(args=args)                                                          │
         │   188 │                                                                                                     │
         │   189 │   def _execute_locally(self, args: Dict[str, Any]) -> None:                                         │
         │   190 │   │   """Execute the pipeline task locally.                                                         │
         │                                                                                                             │
         │ /Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/dsl/pipeline_ta │
         │ sk.py:197 in _execute_locally                                                                               │
         │                                                                                                             │
         │   194 │   │   from kfp.local import task_dispatcher                                                         │
         │   195 │   │                                                                                                 │
         │   196 │   │   if self.pipeline_spec is not None:                                                            │
         │ ❱ 197 │   │   │   self._outputs = pipeline_orchestrator.run_local_pipeline(                                 │
         │   198 │   │   │   │   pipeline_spec=self.pipeline_spec,                                                     │
         │   199 │   │   │   │   arguments=args,                                                                       │
         │   200 │   │   │   )                                                                                         │
         │                                                                                                             │
         │ /Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/local/pipeline_ │
         │ orchestrator.py:43 in run_local_pipeline                                                                    │
         │                                                                                                             │
         │    40 │                                                                                                     │
         │    41 │   # validate and access all global state in this function, not downstream                           │
         │    42 │   config.LocalExecutionConfig.validate()                                                            │
         │ ❱  43 │   return _run_local_pipeline_implementation(                                                        │
         │    44 │   │   pipeline_spec=pipeline_spec,                                                                  │
         │    45 │   │   arguments=arguments,                                                                          │
         │    46 │   │   raise_on_error=config.LocalExecutionConfig.instance.raise_on_error,                           │
         │                                                                                                             │
         │ /Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/local/pipeline_ │
         │ orchestrator.py:108 in _run_local_pipeline_implementation                                                   │
         │                                                                                                             │
         │   105 │   │   │   )                                                                                         │
         │   106 │   │   return outputs                                                                                │
         │   107 │   elif dag_status == status.Status.FAILURE:                                                         │
         │ ❱ 108 │   │   log_and_maybe_raise_for_failure(                                                              │
         │   109 │   │   │   pipeline_name=pipeline_name,                                                              │
         │   110 │   │   │   fail_stack=fail_stack,                                                                    │
         │   111 │   │   │   raise_on_error=raise_on_error,                                                            │
         │                                                                                                             │
         │ /Users/ihrachys/src/llama-stack-provider-kfp-trainer/.venv/lib/python3.12/site-packages/kfp/local/pipeline_ │
         │ orchestrator.py:137 in log_and_maybe_raise_for_failure                                                      │
         │                                                                                                             │
         │   134 │   │   logging_utils.format_task_name(task_name) for task_name in fail_stack)                        │
         │   135 │   msg = f'Pipeline {pipeline_name_with_color} finished with status                                  │
         │       {status_with_color}. Inner task failed: {task_chain_with_color}.'                                     │
         │   136 │   if raise_on_error:                                                                                │
         │ ❱ 137 │   │   raise RuntimeError(msg)                                                                       │
         │   138 │   with logging_utils.local_logger_context():                                                        │
         │   139 │   │   logging.error(msg)                                                                            │
         │   140                                                                                                       │
         ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
         RuntimeError: Pipeline [1m[95m'test-jobc3c2e1e4-859c-4852-a41d-ef29e55e3efa'[1m[0m finished with status
         [1m[91mFAILURE[1m[0m. Inner task failed: [1m[96m'component'[1m[0m.
INFO     2025-05-09 13:32:31,266 llama_stack.distribution.server.server:136 server: Shutting down
         DistributionInspectImpl
INFO     2025-05-09 13:32:31,266 llama_stack.distribution.server.server:136 server: Shutting down ProviderImpl
INFO:     Application shutdown complete.
INFO:     Finished server process [26648]

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Meta Open Source bot. label May 9, 2025
@booxter booxter marked this pull request as draft May 9, 2025 17:39
@booxter booxter force-pushed the shutdown-cancel-scheduler branch from 9403798 to b4af9f4 Compare May 9, 2025 17:42
@booxter booxter changed the title scheduler: cancel tasks on shutdown fix: cancel scheduler tasks on shutdown May 9, 2025
Otherwise the currently running tasks will never exit (before they
actually complete), which means the process can't be properly shut down
(only with SIGKILL).

Ideally, we let tasks know that they are about to shutdown and give them
some time to do so; but in the lack of the mechanism, it's better to
cancel than linger forever.

Signed-off-by: Ihar Hrachyshka <[email protected]>
@booxter booxter force-pushed the shutdown-cancel-scheduler branch from b4af9f4 to 1ceebdc Compare May 9, 2025 17:53
@booxter booxter marked this pull request as ready for review May 9, 2025 18:00
@booxter
Copy link
Collaborator Author

booxter commented May 9, 2025

Testing with several providers...

@booxter booxter marked this pull request as draft May 9, 2025 19:13
@booxter booxter marked this pull request as ready for review May 14, 2025 01:15
@booxter
Copy link
Collaborator Author

booxter commented May 14, 2025

OK I checked the shutdown problem reported by @cdoern for #2132 ; I tested the fix with torchtune / kfp provider and it's helpful. For hftrainer implementation, it's not but this is because hftrainer train() function is non-cooperative. In asyncio world, there's nothing you can do about a coroutine never yielding control to other coroutines. HFTrainer implementation would have to change train() so that it periodically yields.

The fix is correct for providers that are more cooperative, e.g. torchtune-based that yields on each batch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Meta Open Source bot.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants