|
4 | 4 | import sys
|
5 | 5 | import os
|
6 | 6 | import time
|
| 7 | +import signal |
7 | 8 |
|
8 | 9 | from types import MethodType, FunctionType
|
9 | 10 |
|
|
39 | 40 | MAX_FOREACH_PATH_LENGTH = 256
|
40 | 41 |
|
41 | 42 |
|
| 43 | +class SystemSignalHandler: |
| 44 | + def __init__(self, run_id, step_name, task_id, metadata_service, metadata_tags): |
| 45 | + self.run_id = run_id |
| 46 | + self.step_name = (step_name,) |
| 47 | + self.task_id = task_id |
| 48 | + self.metadata_service = metadata_service |
| 49 | + self.metadata_tags = metadata_tags |
| 50 | + |
| 51 | + signal.signal(signal.SIGINT, self.exit_sigint_gracefully) |
| 52 | + signal.signal(signal.SIGTERM, self.exit_sigterm_gracefully) |
| 53 | + print("SystemSignalHandler initialized...") |
| 54 | + |
| 55 | + def log_exit_signal(self, signal): |
| 56 | + metadata_info = [ |
| 57 | + MetaDatum( |
| 58 | + field="signal", |
| 59 | + value=signal, |
| 60 | + type="signal", |
| 61 | + tags=self.metadata_tags, |
| 62 | + ) |
| 63 | + ] |
| 64 | + self.metadata_service.register_metadata( |
| 65 | + self.run_id, self.step_name, self.task_id, metadata_info |
| 66 | + ) |
| 67 | + |
| 68 | + def exit_sigint_gracefully(self, signum, frame): |
| 69 | + print("SIGINT received... ") |
| 70 | + self.status = signal.SIGINT |
| 71 | + |
| 72 | + def exit_sigterm_gracefully(self, signum, frame): |
| 73 | + print("SIGTERM received... ") |
| 74 | + self.status = signal.SIGTERM |
| 75 | + |
| 76 | + |
42 | 77 | class MetaflowTask(object):
|
43 | 78 | """
|
44 | 79 | MetaflowTask prepares a Flow instance for execution of a single step.
|
@@ -412,6 +447,7 @@ def run_step(
|
412 | 447 | )
|
413 | 448 |
|
414 | 449 | metadata_tags = ["attempt_id:{0}".format(retry_count)]
|
| 450 | + SystemSignalHandler(run_id, step_name, task_id, self.metadata, metadata_tags) |
415 | 451 |
|
416 | 452 | metadata = [
|
417 | 453 | MetaDatum(
|
|
0 commit comments