@@ -59,13 +59,15 @@ class ActivationWithPendingFutures:
5959 execution_start_time: Timestamp of task execution start.
6060 futures_start_time: Timestamp of when the pending futures were enqueued.
6161 pending_futures: Set of pending futures generated by executing this activation.
62+ task_func: The Task object related to this activation.
6263 """
6364
6465 inflight : InflightTaskActivation
6566 status : TaskActivationStatus .ValueType
6667 execution_start_time : float
6768 futures_start_time : float
6869 pending_futures : set [ProducerFuture [BrokerValue [KafkaPayload ]]]
70+ task_func : Task [Any , Any ]
6971
7072
7173@contextlib .contextmanager
@@ -261,24 +263,23 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None:
261263 and submitted for retry (if the policy allows).
262264 """
263265 RESULT_TIMEOUT_SEC = 1
264- task_func = _get_known_task (task .inflight .activation )
265266 try :
266267 # We don't care about the actual result value,
267268 # we just care if result() raises or not
268269 [f .result (RESULT_TIMEOUT_SEC ) for f in task .pending_futures ]
269270 # If any pending producer futures failed, retry the task
270271 except Exception :
271272 task .status = TASK_ACTIVATION_STATUS_FAILURE
272- if task_func and task_func .retry :
273+ if task . task_func .retry :
273274 retry_state = task .inflight .activation .retry_state
274- if not task_func .retry .max_attempts_reached (retry_state ):
275+ if not task . task_func .retry .max_attempts_reached (retry_state ):
275276 task .status = TASK_ACTIVATION_STATUS_RETRY
276277 pending_task_futures .remove (task )
277278 _task_execution_complete (
278279 inflight = task .inflight ,
279280 next_state = task .status ,
280281 execution_start_time = task .execution_start_time ,
281- task_func = task_func ,
282+ task_func = task . task_func ,
282283 futures_start_time = task .futures_start_time ,
283284 )
284285
@@ -460,6 +461,7 @@ def check_task_future_completion() -> None:
460461 execution_start_time = execution_start_time ,
461462 futures_start_time = time .time (),
462463 pending_futures = task_produced_futures ,
464+ task_func = task_func ,
463465 )
464466 pending_task_futures .append (pending_task )
465467
0 commit comments