Skip to content

Commit 0ec0541

Browse files
committed
handle sigterm gracefully by adding status to metadata
1 parent d95e7b6 commit 0ec0541

File tree

1 file changed

+38
-0
lines changed

1 file changed

+38
-0
lines changed

metaflow/task.py

+38
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55
import os
66
import time
7+
import signal
78

89
from types import MethodType, FunctionType
910

@@ -39,6 +40,42 @@
3940
MAX_FOREACH_PATH_LENGTH = 256
4041

4142

43+
class SystemSignalHandler:
44+
def __init__(self, run_id, step_name, task_id, metadata_service, metadata_tags):
45+
self.run_id = run_id
46+
self.step_name = (step_name,)
47+
self.task_id = task_id
48+
self.metadata_service = metadata_service
49+
self.metadata_tags = metadata_tags
50+
51+
signal.signal(signal.SIGINT, self.exit_sigint_gracefully)
52+
signal.signal(signal.SIGTERM, self.exit_sigterm_gracefully)
53+
print("SystemSignalHandler initialized...")
54+
55+
def log_exit_signal(self, signal):
56+
metadata_info = [
57+
MetaDatum(
58+
field="signal",
59+
value=signal,
60+
type="signal",
61+
tags=self.metadata_tags,
62+
)
63+
]
64+
self.metadata_service.register_metadata(
65+
self.run_id, self.step_name, self.task_id, metadata_info
66+
)
67+
68+
def exit_sigint_gracefully(self, signum, frame):
69+
print("SIGINT received... ")
70+
self.log_exit_signal("SIGINT")
71+
self.status = signal.SIGINT
72+
73+
def exit_sigterm_gracefully(self, signum, frame):
74+
print("SIGTERM received... ")
75+
self.log_exit_signal("SIGTERM")
76+
self.status = signal.SIGTERM
77+
78+
4279
class MetaflowTask(object):
4380
"""
4481
MetaflowTask prepares a Flow instance for execution of a single step.
@@ -412,6 +449,7 @@ def run_step(
412449
)
413450

414451
metadata_tags = ["attempt_id:{0}".format(retry_count)]
452+
SystemSignalHandler(run_id, step_name, task_id, self.metadata, metadata_tags)
415453

416454
metadata = [
417455
MetaDatum(

0 commit comments

Comments
 (0)