1818import yaml
1919from loguru import logger
2020import notifiers
21+ import dsutil .filesystem as fs
2122
2223
2324class SparkSubmit :
@@ -194,11 +195,16 @@ def _notify_log(self, app_id, subject):
194195 logger .info ("Waiting for 300 seconds for the log to be available..." )
195196 time .sleep (300 )
196197 sp .run (f"logf fetch { app_id } " , shell = True , check = True )
198+ lines = fs .filter (
199+ path = Path (app_id + "_s" ),
200+ pattern = r"^-+\s+Deduped Error Lines\s+-+$" ,
201+ num_lines = 999
202+ )
197203 notifiers .get_notifier ("email" ).notify (
198204 from_ = self .email ["from" ],
199205 to = self .email ["to" ],
200206 subject = "Re: " + subject ,
201- message = Path ( app_id + "_s" ). read_text ( ),
207+ message = "" . join ( lines [ 0 ] ),
202208 host = self .email ["host" ],
203209 username = "" ,
204210 password = "" ,
@@ -320,10 +326,10 @@ def _submit_local(args, config: dict[str, Any]) -> bool:
320326 python = _python (config )
321327 lines .append (f"--conf spark.pyspark.driver.python={ python } " )
322328 lines .append (f"--conf spark.pyspark.python={ python } " )
323- lines .extend (args .cmd )
329+ lines .extend (args .pyfile )
324330 for idx in range (2 , len (lines )):
325331 lines [idx ] = " " * 4 + lines [idx ]
326- return SparkSubmit ().submit (" \\ \n " .join (lines ) + "\n " , args .cmd [:1 ])
332+ return SparkSubmit ().submit (" \\ \n " .join (lines ) + "\n " , args .pyfile [:1 ])
327333
328334
329335def _submit_cluster (args , config : dict [str , Any ]) -> bool :
@@ -348,11 +354,11 @@ def _submit_cluster(args, config: dict[str, Any]) -> bool:
348354 lines = [config ["spark-submit" ]] + [
349355 f"--{ opt } { config [opt ]} " for opt in opts if opt in config and config [opt ]
350356 ] + [f"--conf { k } ={ v } " for k , v in config ["conf" ].items ()]
351- lines .extend (args .cmd )
357+ lines .extend (args .pyfile )
352358 for idx in range (1 , len (lines )):
353359 lines [idx ] = " " * 4 + lines [idx ]
354360 return SparkSubmit (email = config ["email" ]
355- ).submit (" \\ \n " .join (lines ) + "\n " , args .cmd [:1 ])
361+ ).submit (" \\ \n " .join (lines ) + "\n " , args .pyfile [:1 ])
356362
357363
358364def submit (args : Namespace ) -> None :
@@ -431,8 +437,9 @@ def parse_args(args=None, namespace=None) -> Namespace:
431437 help = "Specify a path for generating a configration example."
432438 )
433439 mutex_group .add_argument (
434- "--cmd" ,
435- dest = "cmd" ,
440+ "--py" ,
441+ "--pyfile" ,
442+ dest = "pyfile" ,
436443 nargs = "+" ,
437444 help = "The command (of PySpark script) to submit to Spark to run."
438445 )
0 commit comments