11#!/usr/bin/env python
22# coding: utf-8
33# %%
4+ import uuid
5+
46from dargs .dargs import Argument
57from dpdispatcher .base_context import BaseContext
68from typing import List
79import os
8- # from dpdispatcher import dlog
10+ from dpdispatcher import dlog
911# from dpdispatcher.submission import Machine
12+ # from . import dlog
1013from .dpcloudserver .api import API
1114from .dpcloudserver import zip_file
15+ import shutil
1216# from zip_file import zip_files
1317DP_CLOUD_SERVER_HOME_DIR = os .path .join (
1418 os .path .expanduser ('~' ),
@@ -66,6 +70,18 @@ def bind_submission(self, submission):
6670 # file_uuid = uuid.uuid1().hex
6771 # oss_task_dir = os.path.join()
6872
73+ def _gen_oss_path (self , job , zip_filename ):
74+ if hasattr (job , 'upload_path' ) and job .upload_path :
75+ return job .upload_path
76+ else :
77+ program_id = self .remote_profile .get ('program_id' )
78+ if program_id is None :
79+ program_id = 0
80+ uid = uuid .uuid4 ()
81+ path = os .path .join ("program" , str (program_id ), str (uid ), zip_filename )
82+ setattr (job , 'upload_path' , path )
83+ return path
84+
6985 def upload (self , submission ):
7086 # oss_task_dir = os.path.join('%s/%s/%s.zip' % ('indicate', file_uuid, file_uuid))
7187 # zip_filename = submission.submission_hash + '.zip'
@@ -77,7 +93,7 @@ def upload(self, submission):
7793 for job in submission .belonging_jobs :
7894 self .machine .gen_local_script (job )
7995 zip_filename = job .job_hash + '.zip'
80- oss_task_zip = 'indicate/' + job . job_hash + '/' + zip_filename
96+ oss_task_zip = self . _gen_oss_path ( job , zip_filename )
8197 zip_task_file = os .path .join (self .local_root , zip_filename )
8298
8399 upload_file_list = [job .script_file_name , ]
@@ -97,6 +113,7 @@ def upload(self, submission):
97113 file_list = upload_file_list
98114 )
99115 result = self .api .upload (oss_task_zip , upload_zip , ENDPOINT , BUCKET_NAME )
116+ self ._backup (self .local_root , upload_zip , keep_backup = self .remote_profile .get ('keep_backup' , True ))
100117 return result
101118 # return oss_task_zip
102119 # api.upload(self.oss_task_dir, zip_task_file)
@@ -110,23 +127,42 @@ def download(self, submission):
110127 if isinstance (job .job_id , str ) and ':job_group_id:' in job .job_id :
111128 ids = job .job_id .split (":job_group_id:" )
112129 jid , gid = int (ids [0 ]), int (ids [1 ])
113- job_hashs [jid ] = job .job_hash
130+ job_hashs [jid ] = job .job_hash
114131 group_id = gid
115132 else :
116- job_infos [job .job_hash ] = self .get_tasks (job .job_id )[0 ]
133+ job_infos [job .job_hash ] = self .api . get_tasks (job .job_id )[0 ]
117134 if group_id is not None :
118135 job_result = self .api .get_tasks_v2_list (group_id )
119136 for each in job_result :
120137 if 'result_url' in each and each ['result_url' ] != '' and each ['status' ] == 2 :
121- job_hash = job_hashs [each ['task_id' ]]
138+ job_hash = ''
139+ if each ['task_id' ] not in job_hashs :
140+ dlog .info (f"find unexpect job_hash, but task { each ['task_id' ]} still been download." )
141+ dlog .debug (str (job_hashs ))
142+ job_hash = str (each ['task_id' ])
143+ else :
144+ job_hash = job_hashs [each ['task_id' ]]
122145 job_infos [job_hash ] = each
123- for hash , info in job_infos .items ():
124- result_filename = hash + '_back.zip'
146+ for job_hash , info in job_infos .items ():
147+ result_filename = job_hash + '_back.zip'
125148 target_result_zip = os .path .join (self .local_root , result_filename )
126149 self .api .download_from_url (info ['result_url' ], target_result_zip )
127150 zip_file .unzip_file (target_result_zip , out_dir = self .local_root )
151+ self ._backup (self .local_root , target_result_zip , keep_backup = self .remote_profile .get ('keep_backup' , True ))
128152 return True
129153
154+ def _backup (self , local_root , target , keep_backup = True ):
155+ try :
156+ if keep_backup :
157+ # move to backup directory
158+ os .makedirs (os .path .join (local_root , 'backup' ), exist_ok = True )
159+ shutil .move (target ,
160+ os .path .join (local_root , 'backup' , os .path .split (target )[1 ]))
161+ else :
162+ os .remove (target )
163+ except (OSError , shutil .Error ) as e :
164+ dlog .exception ("unable to backup file, " + str (e ))
165+
130166 def write_file (self , fname , write_str ):
131167 result = self .write_home_file (fname , write_str )
132168 return result
@@ -214,4 +250,9 @@ def machine_subfields(cls) -> List[Argument]:
214250 'None or empty.' )
215251 ], optional = False , doc = "Configuration of job" ),
216252 ], doc = doc_remote_profile )]
253+
254+
255+ class LebesgueContext (DpCloudServerContext ):
256+ pass
257+
217258#%%
0 commit comments