Stop the worker if the connection to zeebe is lost #604
-
|
I am working with a Kubernetes cluster where I have a pod that deploys a FastAPI application. On FastAPI startup, I deploy the run_zeebe_worker function, which allows me to execute Zeebe jobs like this: def init_zeebe_connection(worker_configuration: WorkerConfiguration) -> Tuple[Channel, ZeebeClient, CustomZeebeAdapter]:
grpc_channel = create_insecure_channel(worker_configuration.zeebe_address + ":" + str(worker_configuration.zeebe_grpc_port))
zeebe_client = ZeebeClient(grpc_channel)
zeebe_adapter = CustomZeebeAdapter(grpc_channel, max_connection_retries=10)
logger.info("Zeebe connection established")
return grpc_channel, zeebe_client, zeebe_adapter
async def run_zeebe_worker(configuration: WorkerConfiguration) -> None:
global health_status
logger.info(f"Zeebe worker connecting to {configuration.zeebe_address}:{configuration.zeebe_grpc_port}")
grpc_channel_zeebe, zeebe_client, zeebe_adapter = init_zeebe_connection(configuration)
worker = ZeebeWorker(grpc_channel_zeebe, max_connection_retries=1, poll_retry_delay=configuration.zeebe_poll_interval)
logger.info("Zeebe worker started")
@worker.task(task_type="service_jobexecution_PythonProcessingTaskBackground", # type: ignore
variables_to_fetch=['taskInput', 'lastOutput', 'inputMetadata', 'restartFromInfo', 'transactionId'],
variable_name='output',
timeout_ms=31556926000, # timeout for Zeebe is one year, we handle it ourselves internally
max_running_jobs=1,
max_jobs_to_activate=1,
exception_handler=exception_handler)
async def background_processing(job: Job, taskInput: Dict[str, Any], lastOutput: Dict[str, Any], inputMetadata: Dict[str, Any], restartFromInfo: Optional[Dict[str, Any]], transactionId: str) -> Dict[str, Any]:
# ... some code ...
try:
await worker.work()
except Exception:
logger.warning("Python Processing will restart soon")
logger.warning(traceback.format_exc())
health_status.is_healthy = False
if grpc_channel_zeebe.get_state() == grpc.ChannelConnectivity.SHUTDOWN:
logger.warning("Python Processing will restart soon")
health_status.is_healthy = False Sometimes the pod loses connection to the Zeebe gateway. Occasionally, 10 retries are not enough, so I want the pod's health check to fail and restart the pod. However, the problem I face now is that await worker.work() gets stuck forever after the 10 retries, preventing me from reaching the next lines of code. The only workaround I found was to edit worker.py and add this function, then use it in the work() function: async def _monitor_connection(self) -> None:
while True:
if not self.zeebe_adapter.connected:
logger.error("Connection to Zeebe lost")
await self.stop()
break
await asyncio.sleep(5)
async def work(self) -> None:
"""
Start the worker. The worker will poll zeebe for jobs of each task in a different asyncio task.
Raises:
ActivateJobsRequestInvalidError: If one of the worker's task has invalid types
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
self._init_tasks()
async with anyio.create_task_group() as tg:
tg.start_soon(self._monitor_connection)
I might be missing something here, do you think there is a better solution? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
|
Hi. Maybe something like that? async def _monitor_connection(worker: pyzeebe.ZeebeWorker):
while worker.zeebe_adapter.connected:
await asyncio.sleep(0.1)
await worker.stop()
async def run_worker():
worker = pyzeebe.ZeebeWorker(...)
async with anyio.create_task_group() as tg: # or asyncio.TaskGroup()
tg.start_soon(worker.work)
tg.start_soon(_monitor_connection, worker) |
Beta Was this translation helpful? Give feedback.
In fact, your PR would solve the problem, #526, so I'll close the topic.