2121from google .api_core .exceptions import NotFound
2222import google .oauth2 .credentials as oauth2
2323import aiofiles
24+ import json
2425
2526import aiohttp
2627import pendulum
3536 PACKAGE_NAME ,
3637 WRAPPER_PAPPERMILL_FILE ,
3738 UTF8 ,
39+ PAYLOAD_JSON_FILE_PATH ,
3840)
3941from scheduler_jupyter_plugin .models .models import DescribeJob
4042from scheduler_jupyter_plugin .services import airflow
@@ -290,8 +292,7 @@ async def check_package_in_env(self, composer_environment_name):
290292 else :
291293 decoded_output = stdout .decode (UTF8 )
292294 installed_packages = set (
293- line .split ()[0 ].lower ()
294- for line in decoded_output .splitlines ()[2 :]
295+ line .split ()[0 ].lower () for line in decoded_output .splitlines ()[2 :]
295296 )
296297 for package in packages :
297298 if package .lower () not in installed_packages :
@@ -303,7 +304,6 @@ async def check_package_in_env(self, composer_environment_name):
303304 self .log .exception (f"Error checking packages: { error } " )
304305 raise IOError (f"Error checking packages: { error } " )
305306
306-
307307 async def install_to_composer_environment (
308308 self , local_kernel , composer_environment_name , packages_to_install
309309 ):
@@ -320,9 +320,7 @@ async def install_to_composer_environment(
320320 stderr = subprocess .PIPE ,
321321 shell = True ,
322322 )
323- install_stdout , install_stderr = (
324- install_process .communicate ()
325- )
323+ install_stdout , install_stderr = install_process .communicate ()
326324 if install_process .returncode == 0 :
327325 self .log .info (f"{ package } installed successfully." )
328326 else :
@@ -338,6 +336,12 @@ async def install_to_composer_environment(
338336 self .log .exception (f"error installing { package } : { install_stderr } " )
339337 return {"error" : str (e )}
340338
339+ def create_payload (self , file_path , project_id , region , input_data ):
340+ payload = {"projectId" : project_id , "region" : region , "job" : input_data }
341+
342+ with open (file_path , "w" ) as f :
343+ json .dump (payload , f , indent = 4 )
344+
341345 async def execute (self , input_data , project_id = None , region_id = None ):
342346 try :
343347 job = DescribeJob (** input_data )
@@ -354,7 +358,9 @@ async def execute(self, input_data, project_id=None, region_id=None):
354358
355359 if job .packages_to_install != None :
356360 install_packages = await self .install_to_composer_environment (
357- job .local_kernel , job .composer_environment_name , job .packages_to_install
361+ job .local_kernel ,
362+ job .composer_environment_name ,
363+ job .packages_to_install ,
358364 )
359365 if install_packages and install_packages .get ("error" ):
360366 raise Exception (install_packages )
@@ -374,12 +380,25 @@ async def execute(self, input_data, project_id=None, region_id=None):
374380 print (
375381 f"The file gs://{ gcs_dag_bucket } /{ wrapper_pappermill_file_path } does not exist."
376382 )
383+ # uploading input file while creating the job
377384 if not job .input_filename .startswith (GCS ):
378385 await self .upload_to_gcs (
379386 gcs_dag_bucket ,
380387 file_path = f"./{ job .input_filename } " ,
381388 destination_dir = f"dataproc-notebooks/{ job_name } /input_notebooks" ,
382389 )
390+ # creating a json file for payload
391+ self .create_payload (
392+ PAYLOAD_JSON_FILE_PATH , project_id , region_id , input_data
393+ )
394+
395+ # uploading payload JSON file to GCS
396+ await self .upload_to_gcs (
397+ gcs_dag_bucket ,
398+ file_path = PAYLOAD_JSON_FILE_PATH ,
399+ destination_dir = f"dataproc-notebooks/{ job_name } /dag_details" ,
400+ )
401+
383402 file_path = self .prepare_dag (job , gcs_dag_bucket , dag_file )
384403 await self .upload_to_gcs (
385404 gcs_dag_bucket , file_path = file_path , destination_dir = "dags"
0 commit comments