Skip to content

Commit 1bd1f53

Browse files
committed
Add logs to investigate slow stream pod termination [1.10.x]
[ML-11919](https://iguazio.atlassian.net/browse/ML-11919)
1 parent ace5a42 commit 1bd1f53

File tree

3 files changed

+65
-3
lines changed

3 files changed

+65
-3
lines changed

storey/flow.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,9 @@ async def _do_downstream(self, event, outlets=None):
261261
if not outlets:
262262
return
263263
if event is _termination_obj:
264+
if self.logger:
265+
outlet_names = ", ".join([outlet.name for outlet in outlets])
266+
self.logger.info(f"Forwarding termination signal from step '{self.name}' to steps: {outlet_names}")
264267
# Only propagate the termination object once we received one per inlet
265268
outlets[0]._termination_received += 1
266269
if outlets[0]._should_terminate():
@@ -856,7 +859,15 @@ async def _worker(self):
856859
# be 1 higher than requested. Hence, we peek.
857860
job = await self._q.peek()
858861
if job is _termination_obj:
862+
if self.logger:
863+
self.logger.info(
864+
f"Terminating ConcurrentJobExecution worker belonging to step '{self.name}'"
865+
)
859866
await self._q.get()
867+
if self.logger:
868+
self.logger.info(
869+
f"Terminated ConcurrentJobExecution worker belonging to step '{self.name}'"
870+
)
860871
break
861872
event = job[0]
862873
completed = await job[1]
@@ -935,10 +946,22 @@ async def _do(self, event):
935946

936947
if event is _termination_obj:
937948
if self._queue_size > 0:
949+
if self.logger:
950+
self.logger.info(
951+
f"Sending termination signal to ConcurrentJobExecution worker belonging to step '{self.name}'"
952+
)
938953
await self._q.put(_termination_obj)
954+
if self.logger:
955+
self.logger.info(
956+
f"Awaiting termination of ConcurrentJobExecution worker belonging to step '{self.name}'"
957+
)
939958
await self._worker_awaitable
940959
else:
960+
if self.logger:
961+
self.logger.info(f"Terminating ConcurrentJobExecution step '{self.name}' without a worker")
941962
await self._cleanup()
963+
if self.logger:
964+
self.logger.info(f"Terminated ConcurrentJobExecution step '{self.name}' without a worker")
942965
return await self._do_downstream(_termination_obj)
943966
else:
944967
coroutine = self._process_event_with_retries(event)
@@ -1138,8 +1161,14 @@ async def _terminate(self):
11381161

11391162
async def _do(self, event):
11401163
if event is _termination_obj:
1164+
if self.logger:
1165+
self.logger.info(f"Terminating Batching step '{self.name}': emitting all remaining batches")
11411166
await self._emit_all()
1167+
if self.logger:
1168+
self.logger.info(f"Terminating Batching step '{self.name}': running customer termination code")
11421169
await self._terminate()
1170+
if self.logger:
1171+
self.logger.info(f"Terminated Batching step '{self.name}'")
11431172
return await self._do_downstream(_termination_obj)
11441173

11451174
key = self._extract_key(event)

storey/targets.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,12 +1198,29 @@ async def _worker(self):
11981198
continue
11991199
event = await self._q.get()
12001200
if event is _termination_obj: # handle outstanding batches and in flight requests on termination
1201+
if self.logger:
1202+
self.logger.info(
1203+
f"Terminating StreamTarget worker belonging to step '{self.name}': "
1204+
f"awaiting {len(in_flight_reqs)} in-flight batches"
1205+
)
12011206
for req in in_flight_reqs:
12021207
await self._handle_response(req)
1208+
if self.logger:
1209+
self.logger.info(
1210+
f"Terminating StreamTarget worker belonging to step '{self.name}': "
1211+
f"sending {self._shards} final batches"
1212+
)
12031213
for shard_id in range(self._shards):
12041214
self._send_batch(buffers, in_flight_reqs, buffer_events, in_flight_events, shard_id)
1215+
if self.logger:
1216+
self.logger.info(
1217+
f"Terminating StreamTarget worker belonging to step '{self.name}': "
1218+
f"awaiting {len(in_flight_reqs)} final batches"
1219+
)
12051220
for req in in_flight_reqs:
12061221
await self._handle_response(req)
1222+
if self.logger:
1223+
self.logger.info(f"Terminated StreamTarget worker belonging to step '{self.name}'")
12071224
break
12081225
sharding_func_result = self._sharding_func(event)
12091226
if isinstance(sharding_func_result, int):
@@ -1281,8 +1298,12 @@ async def _do(self, event):
12811298
raise AssertionError("StreamTarget worker has already terminated")
12821299

12831300
if event is _termination_obj:
1301+
if self.logger:
1302+
self.logger.info(f"Terminating StreamTarget step '{self.name}'")
12841303
await self._q.put(_termination_obj)
12851304
await self._worker_awaitable
1305+
if self.logger:
1306+
self.logger.info(f"Terminated StreamTarget step '{self.name}'")
12861307
return await self._do_downstream(_termination_obj)
12871308
else:
12881309
await self._q.put(event)
@@ -1365,8 +1386,14 @@ async def _do(self, event):
13651386
await self._lazy_init()
13661387

13671388
if event is _termination_obj:
1389+
if self.logger:
1390+
self.logger.info(f"Terminating KafkaTarget step '{self.name}': flushing producer")
13681391
self._producer.flush()
1392+
if self.logger:
1393+
self.logger.info(f"Terminating KafkaTarget step '{self.name}': closing producer")
13691394
self._producer.close()
1395+
if self.logger:
1396+
self.logger.info(f"Terminated KafkaTarget step '{self.name}': closing producer")
13701397
return await self._do_downstream(_termination_obj)
13711398
else:
13721399
key = event.key
@@ -1442,7 +1469,11 @@ async def _handle_completed(self, event, response):
14421469

14431470
async def _do(self, event):
14441471
if event is _termination_obj:
1472+
if self.logger:
1473+
self.logger.info(f"Terminating NoSqlTarget step '{self.name}': terminating table")
14451474
await self._table._terminate()
1475+
if self.logger:
1476+
self.logger.info(f"Terminated NoSqlTarget step '{self.name}'")
14461477
return await self._do_downstream(_termination_obj)
14471478

14481479
if event.key is None:

tests/test_flow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4677,14 +4677,16 @@ def test_verbose_logs():
46774677
controller.terminate()
46784678
controller.await_termination()
46794679

4680-
assert len(logger.logs) == 2
4680+
debug_logs = [log for log in logger.logs if log[0] == "debug"]
46814681

4682-
level, args, kwargs = logger.logs[0]
4682+
assert len(debug_logs) == 2
4683+
4684+
level, args, kwargs = debug_logs[0]
46834685
assert level == "debug"
46844686
assert args == ("SyncEmitSource -> Map1 | Event(id=myid, path=/, body={})",)
46854687
assert kwargs == {}
46864688

4687-
level, args, kwargs = logger.logs[1]
4689+
level, args, kwargs = debug_logs[1]
46884690
assert level == "debug"
46894691
assert args == ("Map1 -> Map2 | Event(id=myid, path=/, body={})",)
46904692
assert kwargs == {}

0 commit comments

Comments
 (0)