99import itertools as it
1010from argparse import Namespace , ArgumentParser
1111from pathlib import Path
12+ import tempfile
1213import shutil
1314import subprocess as sp
1415import re
@@ -113,16 +114,14 @@ def _spark_log_filter(self, line: str) -> bool:
113114 return False
114115
115116 @staticmethod
116- def _print_filter (line : str , log_filter : Union [Callable , None ] = None ) -> bool :
117+ def _filter (line : str , time_begin , log_filter : Union [Callable , None ] = None ) -> str :
117118 if not line :
118- return False
119- if log_filter is None :
120- print (line )
121- return True
122- if log_filter (line ):
123- print (line )
124- return True
125- return False
119+ return ""
120+ if log_filter is None or log_filter (line ):
121+ if "final status:" in line or " (state: " in line :
122+ line = line + f" (Time elapsed: { datetime .datetime .now () - time_begin } )"
123+ return line
124+ return ""
126125
127126 def submit (self , cmd : str , attachments : Union [None , list [str ]] = None ) -> bool :
128127 """Submit a Spark job.
@@ -131,19 +130,25 @@ def submit(self, cmd: str, attachments: Union[None, list[str]] = None) -> bool:
131130 :param attachments: Attachments to send with the notification email.
132131 :return: True if the Spark application succeeds and False otherwise.
133132 """
134- logger .info ("Submitting Spark job...\n {}" , cmd )
133+ time_begin = datetime .datetime .now ()
134+ logger .info ("Submitting Spark job ...\n {}" , cmd )
135135 stdout = []
136136 self ._spark_submit_log .clear ()
137137 process = sp .Popen (cmd , shell = True , stderr = sp .PIPE )
138138 while True :
139139 if process .poll () is None :
140140 line = process .stderr .readline ().decode ().rstrip () # pytype: disable=attribute-error
141- if self ._print_filter (line , self ._spark_log_filter ):
141+ line = self ._filter (line , time_begin , self ._spark_log_filter )
142+ if line :
143+ print (line )
142144 stdout .append (line )
143145 else :
144146 for line in process .stderr .readlines (): # pytype: disable=attribute-error
145- line = line .decode ().rstrip ()
146- if self ._print_filter (line , self ._spark_log_filter ):
147+ line = self ._filter (
148+ line .decode ().rstrip (), time_begin , self ._spark_log_filter
149+ )
150+ if line :
151+ print (line )
147152 stdout .append (line )
148153 break
149154 # status
@@ -168,14 +173,21 @@ def submit(self, cmd: str, attachments: Union[None, list[str]] = None) -> bool:
168173 attachments = [attachments ]
169174 if not isinstance (attachments , list ):
170175 attachments = list (attachments )
171- param ["attachments" ] = attachments
176+ param ["attachments" ] = self . _attach_txt ( attachments )
172177 notifiers .get_notifier ("email" ).notify (** param )
173178 if status == "FAILED" :
174179 if self .email :
175180 self ._notify_log (app_id , "Re: " + subject )
176181 return False
177182 return True
178183
184+ @staticmethod
185+ def _attach_txt (attachments : list [str ]) -> list [str ]:
186+ dir_ = Path (tempfile .mkdtemp ())
187+ for attach in attachments :
188+ shutil .copy2 (attach , dir_ )
189+ return [str (path ) + ".txt" for path in dir_ .iterdir ()]
190+
179191 def _notify_log (self , app_id , subject ):
180192 logger .info ("Waiting for 300 seconds for the log to be available..." )
181193 time .sleep (300 )
0 commit comments