Skip to content

Commit 0a8e900

Browse files
authored
Add logs to investigate slow stream pod termination (#602)
[ML-11919](https://iguazio.atlassian.net/browse/ML-11919)
1 parent d71bf78 commit 0a8e900

File tree

3 files changed

+65
-3
lines changed

3 files changed

+65
-3
lines changed

storey/flow.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,9 @@ async def _do_downstream(self, event, outlets=None, select_outlets: bool = True)
294294
if not outlets:
295295
return
296296
if event is _termination_obj:
297+
if self.logger:
298+
outlet_names = ", ".join([outlet.name for outlet in outlets])
299+
self.logger.info(f"Forwarding termination signal from step '{self.name}' to steps: {outlet_names}")
297300
# Only propagate the termination object once we received one per inlet
298301
outlets[0]._termination_received += 1
299302
if outlets[0]._should_terminate():
@@ -937,7 +940,15 @@ async def _worker(self):
937940
# be 1 higher than requested. Hence, we peek.
938941
job = await self._q.peek()
939942
if job is _termination_obj:
943+
if self.logger:
944+
self.logger.info(
945+
f"Terminating ConcurrentJobExecution worker belonging to step '{self.name}'"
946+
)
940947
await self._q.get()
948+
if self.logger:
949+
self.logger.info(
950+
f"Terminated ConcurrentJobExecution worker belonging to step '{self.name}'"
951+
)
941952
break
942953
event = job[0]
943954
completed = await job[1]
@@ -1016,10 +1027,22 @@ async def _do(self, event):
10161027

10171028
if event is _termination_obj:
10181029
if self._queue_size > 0:
1030+
if self.logger:
1031+
self.logger.info(
1032+
f"Sending termination signal to ConcurrentJobExecution worker belonging to step '{self.name}'"
1033+
)
10191034
await self._q.put(_termination_obj)
1035+
if self.logger:
1036+
self.logger.info(
1037+
f"Awaiting termination of ConcurrentJobExecution worker belonging to step '{self.name}'"
1038+
)
10201039
await self._worker_awaitable
10211040
else:
1041+
if self.logger:
1042+
self.logger.info(f"Terminating ConcurrentJobExecution step '{self.name}' without a worker")
10221043
await self._cleanup()
1044+
if self.logger:
1045+
self.logger.info(f"Terminated ConcurrentJobExecution step '{self.name}' without a worker")
10231046
return await self._do_downstream(_termination_obj)
10241047
else:
10251048
coroutine = self._process_event_with_retries(event)
@@ -1219,8 +1242,14 @@ async def _terminate(self):
12191242

12201243
async def _do(self, event):
12211244
if event is _termination_obj:
1245+
if self.logger:
1246+
self.logger.info(f"Terminating Batching step '{self.name}': emitting all remaining batches")
12221247
await self._emit_all()
1248+
if self.logger:
1249+
self.logger.info(f"Terminating Batching step '{self.name}': running custom termination code")
12231250
await self._terminate()
1251+
if self.logger:
1252+
self.logger.info(f"Terminated Batching step '{self.name}'")
12241253
return await self._do_downstream(_termination_obj)
12251254

12261255
key = self._extract_key(event)

storey/targets.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,12 +1198,29 @@ async def _worker(self):
11981198
continue
11991199
event = await self._q.get()
12001200
if event is _termination_obj: # handle outstanding batches and in flight requests on termination
1201+
if self.logger:
1202+
self.logger.info(
1203+
f"Terminating StreamTarget worker belonging to step '{self.name}': "
1204+
f"awaiting {len(in_flight_reqs)} in-flight batches"
1205+
)
12011206
for req in in_flight_reqs:
12021207
await self._handle_response(req)
1208+
if self.logger:
1209+
self.logger.info(
1210+
f"Terminating StreamTarget worker belonging to step '{self.name}': "
1211+
f"sending {self._shards} final batches"
1212+
)
12031213
for shard_id in range(self._shards):
12041214
self._send_batch(buffers, in_flight_reqs, buffer_events, in_flight_events, shard_id)
1215+
if self.logger:
1216+
self.logger.info(
1217+
f"Terminating StreamTarget worker belonging to step '{self.name}': "
1218+
f"awaiting {len(in_flight_reqs)} final batches"
1219+
)
12051220
for req in in_flight_reqs:
12061221
await self._handle_response(req)
1222+
if self.logger:
1223+
self.logger.info(f"Terminated StreamTarget worker belonging to step '{self.name}'")
12071224
break
12081225
sharding_func_result = self._sharding_func(event)
12091226
if isinstance(sharding_func_result, int):
@@ -1281,8 +1298,12 @@ async def _do(self, event):
12811298
raise AssertionError("StreamTarget worker has already terminated")
12821299

12831300
if event is _termination_obj:
1301+
if self.logger:
1302+
self.logger.info(f"Terminating StreamTarget step '{self.name}'")
12841303
await self._q.put(_termination_obj)
12851304
await self._worker_awaitable
1305+
if self.logger:
1306+
self.logger.info(f"Terminated StreamTarget step '{self.name}'")
12861307
return await self._do_downstream(_termination_obj)
12871308
else:
12881309
await self._q.put(event)
@@ -1365,8 +1386,14 @@ async def _do(self, event):
13651386
await self._lazy_init()
13661387

13671388
if event is _termination_obj:
1389+
if self.logger:
1390+
self.logger.info(f"Terminating KafkaTarget step '{self.name}': flushing producer")
13681391
self._producer.flush()
1392+
if self.logger:
1393+
self.logger.info(f"Terminating KafkaTarget step '{self.name}': closing producer")
13691394
self._producer.close()
1395+
if self.logger:
1396+
self.logger.info(f"Terminated KafkaTarget step '{self.name}': closing producer")
13701397
return await self._do_downstream(_termination_obj)
13711398
else:
13721399
key = event.key
@@ -1442,7 +1469,11 @@ async def _handle_completed(self, event, response):
14421469

14431470
async def _do(self, event):
14441471
if event is _termination_obj:
1472+
if self.logger:
1473+
self.logger.info(f"Terminating NoSqlTarget step '{self.name}': terminating table")
14451474
await self._table._terminate()
1475+
if self.logger:
1476+
self.logger.info(f"Terminated NoSqlTarget step '{self.name}'")
14461477
return await self._do_downstream(_termination_obj)
14471478

14481479
if event.key is None:

tests/test_flow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4678,14 +4678,16 @@ def test_verbose_logs():
46784678
controller.terminate()
46794679
controller.await_termination()
46804680

4681-
assert len(logger.logs) == 2
4681+
debug_logs = [log for log in logger.logs if log[0] == "debug"]
46824682

4683-
level, args, kwargs = logger.logs[0]
4683+
assert len(debug_logs) == 2
4684+
4685+
level, args, kwargs = debug_logs[0]
46844686
assert level == "debug"
46854687
assert args == ("SyncEmitSource -> Map1 | Event(id=myid, path=/, body={})",)
46864688
assert kwargs == {}
46874689

4688-
level, args, kwargs = logger.logs[1]
4690+
level, args, kwargs = debug_logs[1]
46894691
assert level == "debug"
46904692
assert args == ("Map1 -> Map2 | Event(id=myid, path=/, body={})",)
46914693
assert kwargs == {}

0 commit comments

Comments
 (0)