11"""Celery task execution."""
22
33import os
4+ import signal
45import subprocess
56from asyncio import sleep
67from datetime import datetime , timezone
78from pathlib import Path
89from typing import List , Optional , Tuple
910from uuid import UUID
1011
12+ import psutil
1113from celery import Celery
1214from celery .backends .base import TaskRevokedError
1315from celery .result import AsyncResult
1921from aqueductcore .backend .context import UserInfo
2022from aqueductcore .backend .errors import AQDDBTaskNonExisting , AQDPermission
2123from aqueductcore .backend .models import orm
22- from aqueductcore .backend .models .task import TaskProcessExecutionResult , TaskRead
24+ from aqueductcore .backend .models .task import (TaskProcessExecutionResult ,
25+ TaskRead )
2326from aqueductcore .backend .services .utils import task_orm_to_model
2427from aqueductcore .backend .settings import settings
2528
3740
3841
3942celery_app .conf .update (result_extended = True )
43+ extension_process = None # pylint: disable=invalid-name
44+
45+ def worker_signal_handler (signo , _ ):
46+ """ Handle SIGINT signal and propagate it to child and grandchild processes. """
47+ global extension_process # pylint: disable=global-statement,global-variable-not-assigned
48+ if extension_process is not None :
49+ psutil_child_process = psutil .Process (extension_process .pid )
50+ for grand_child in psutil_child_process .children (recursive = True ):
51+ grand_child .send_signal (signo )
52+ extension_process .send_signal (signo )
4053
4154
4255@celery_app .task (bind = True )
43- def run_executable (
44- self , # pylint: disable=unused-argument
56+ def run_executable ( # pylint: disable=unused-argument
57+ self ,
4558 extension_directory_name : str ,
4659 shell_script : str ,
4760 ** kwargs ,
@@ -59,6 +72,8 @@ def run_executable(
5972 Returns:
6073 Tuple[int, str, str]: result code, std out, std error.
6174 """
75+ global extension_process # pylint: disable=global-statement
76+ signal .signal (signal .SIGINT , worker_signal_handler )
6277 extensions_dir = os .environ .get ("EXTENSIONS_DIR_PATH" , "" )
6378 if not extensions_dir :
6479 raise FileNotFoundError ("EXTENSIONS_DIR_PATH environment variable should be set." )
@@ -78,6 +93,7 @@ def run_executable(
7893 env = myenv ,
7994 cwd = workdir ,
8095 ) as proc :
96+ extension_process = proc
8197 out , err = proc .communicate (timeout = None )
8298 code = proc .returncode
8399 args = (code , out .decode ("utf-8" ), err .decode ("utf-8" ))
@@ -102,22 +118,25 @@ async def _update_task_info(task_id: str, wait=False) -> TaskProcessExecutionRes
102118 task_result = task .result
103119
104120 if task_result is not None :
105- known_errors = (FileNotFoundError , TaskRevokedError , AttributeError , Exception )
121+ known_exceptions = (FileNotFoundError , KeyboardInterrupt , TaskRevokedError , Exception )
106122 if isinstance (task_result , ChildProcessError ):
107123 if task_result .args is not None and len (task_result .args ) > 0 :
108124 if len (task_result .args [0 ]) == 3 :
109125 code , out , err = task_result .args [0 ]
110126 task_info .result_code = code
111127 task_info .std_out = out
112128 task_info .std_err = err
113- elif isinstance (task_result , known_errors ):
114- err = str (task_result )
115- task_info .std_err = err
129+ elif isinstance (task_result , known_exceptions ):
130+ task_info .std_err = str (task_result )
116131 elif task .ready ():
117- code , out , err = task_result
118- task_info .result_code = code
119- task_info .std_out = out
120- task_info .std_err = err
132+ # in case the result format is incorrect
133+ if len (task_result ) == 3 :
134+ code , out , err = task_result
135+ task_info .result_code = code
136+ task_info .std_out = out
137+ task_info .std_err = err
138+ else :
139+ task_info .std_err = str (task_result )
121140 task_info .ended_at = task .date_done
122141 task_info .kwargs = task .kwargs
123142
@@ -177,11 +196,7 @@ async def revoke_task(
177196 if not user_info .can_cancel_task_owned_by (task_user ):
178197 raise AQDPermission ("User has no permission to cancel tasks of this user." )
179198
180- # note: SIGINT does not lead to task abort. If you send
181- # KeyboardInterupt (SIGINT), it will not stop, and the
182- # exception does not propagate.
183- AsyncResult (db_task .task_id ).revoke (terminate = terminate , signal = "SIGTERM" )
184-
199+ AsyncResult (db_task .task_id ).revoke (terminate = terminate , signal = "SIGINT" )
185200 task_info = await _update_task_info (task_id = db_task .task_id , wait = False )
186201
187202 username = db_task .created_by_user .username
0 commit comments