@@ -260,7 +260,7 @@ def get_cost_data(self, params):
260
260
is_canceled = False
261
261
262
262
for costs_data in self .ds_plugin_mgr .get_cost_data (
263
- options , secret_data , schema , task_options , domain_id
263
+ options , secret_data , schema , task_options , domain_id
264
264
):
265
265
results = costs_data .get ("results" , [])
266
266
for cost_data in results :
@@ -434,11 +434,11 @@ def create_cost_job(self, data_source_vo: DataSource, job_options):
434
434
return job_vo
435
435
436
436
def _list_secret_ids_from_secret_type (
437
- self ,
438
- data_source_vo : DataSource ,
439
- secret_type : str ,
440
- workspace_id : str ,
441
- domain_id : str ,
437
+ self ,
438
+ data_source_vo : DataSource ,
439
+ secret_type : str ,
440
+ workspace_id : str ,
441
+ domain_id : str ,
442
442
):
443
443
secret_ids = []
444
444
@@ -459,7 +459,7 @@ def _list_secret_ids_from_secret_type(
459
459
return secret_ids
460
460
461
461
def _list_secret_ids_from_secret_filter (
462
- self , secret_filter , provider : str , workspace_id : str , domain_id : str
462
+ self , secret_filter , provider : str , workspace_id : str , domain_id : str
463
463
):
464
464
secret_manager : SecretManager = self .locator .get_manager (SecretManager )
465
465
@@ -474,7 +474,7 @@ def _list_secret_ids_from_secret_filter(
474
474
475
475
@staticmethod
476
476
def _set_secret_filter (
477
- secret_filter , provider : str , workspace_id : str , domain_id : str
477
+ secret_filter , provider : str , workspace_id : str , domain_id : str
478
478
):
479
479
_filter = [{"k" : "domain_id" , "v" : domain_id , "o" : "eq" }]
480
480
@@ -489,8 +489,8 @@ def _set_secret_filter(
489
489
{"k" : "secret_id" , "v" : secret_filter ["secrets" ], "o" : "in" }
490
490
)
491
491
if (
492
- "service_accounts" in secret_filter
493
- and secret_filter ["service_accounts" ]
492
+ "service_accounts" in secret_filter
493
+ and secret_filter ["service_accounts" ]
494
494
):
495
495
_filter .append (
496
496
{
@@ -586,10 +586,10 @@ def _create_cost_data(self, cost_data, job_task_vo, cost_options):
586
586
self .cost_mgr .create_cost (cost_data , execute_rollback = False )
587
587
588
588
def _is_job_failed (
589
- self ,
590
- job_id : str ,
591
- domain_id : str ,
592
- workspace_id : str ,
589
+ self ,
590
+ job_id : str ,
591
+ domain_id : str ,
592
+ workspace_id : str ,
593
593
):
594
594
job_vo : Job = self .job_mgr .get_job (job_id , domain_id , workspace_id )
595
595
@@ -599,12 +599,12 @@ def _is_job_failed(
599
599
return False
600
600
601
601
def _close_job (
602
- self ,
603
- job_id : str ,
604
- data_source_id : str ,
605
- domain_id : str ,
606
- data_keys : list ,
607
- workspace_id : str = None ,
602
+ self ,
603
+ job_id : str ,
604
+ data_source_id : str ,
605
+ domain_id : str ,
606
+ data_keys : list ,
607
+ workspace_id : str = None ,
608
608
) -> None :
609
609
job_vo : Job = self .job_mgr .get_job (job_id , domain_id , workspace_id )
610
610
no_preload_cache = job_vo .options .get ("no_preload_cache" , False )
@@ -754,7 +754,7 @@ def _delete_old_cost_data(self, data_source_id, domain_id):
754
754
monthly_cost_vos .delete ()
755
755
756
756
def _delete_changed_cost_data (
757
- self , job_vo : Job , start , end , change_filter , domain_id
757
+ self , job_vo : Job , start , end , change_filter , domain_id
758
758
):
759
759
query = {
760
760
"filter" : [
@@ -800,7 +800,7 @@ def _aggregate_cost_data(self, job_vo: Job, data_keys: list):
800
800
801
801
for job_task_id in job_task_ids :
802
802
for billed_month in self ._distinct_billed_month (
803
- data_source_id , domain_id , job_id , job_task_id
803
+ data_source_id , domain_id , job_id , job_task_id
804
804
):
805
805
self ._aggregate_monthly_cost_data (
806
806
data_source_id ,
@@ -832,14 +832,14 @@ def _distinct_billed_month(self, data_source_id, domain_id, job_id, job_task_id)
832
832
return values
833
833
834
834
def _aggregate_monthly_cost_data (
835
- self ,
836
- data_source_id : str ,
837
- domain_id : str ,
838
- job_id : str ,
839
- job_task_id : str ,
840
- billed_month : str ,
841
- data_keys : list ,
842
- workspace_id : str = None ,
835
+ self ,
836
+ data_source_id : str ,
837
+ domain_id : str ,
838
+ job_id : str ,
839
+ job_task_id : str ,
840
+ billed_month : str ,
841
+ data_keys : list ,
842
+ workspace_id : str = None ,
843
843
):
844
844
query = {
845
845
"group_by" : [
@@ -906,7 +906,7 @@ def _get_all_data_sources(self):
906
906
)
907
907
908
908
def _check_duplicate_job (
909
- self , data_source_id : str , domain_id : str , this_job_vo : Job
909
+ self , data_source_id : str , domain_id : str , this_job_vo : Job
910
910
):
911
911
query = {
912
912
"filter" : [
@@ -932,14 +932,6 @@ def _check_duplicate_job(
932
932
933
933
return False
934
934
935
- @staticmethod
936
- def _get_start_last_synchronized_at (params ):
937
- start = params .get ("start" )
938
- last_synchronized_at = utils .datetime_to_iso8601 (
939
- params .get ("last_synchronized_at" )
940
- )
941
- return start , last_synchronized_at
942
-
943
935
def _get_job_task_ids (self , job_id , domain_id ):
944
936
job_task_ids = []
945
937
job_task_vos = self .job_task_mgr .filter_job_tasks (
@@ -952,11 +944,11 @@ def _get_job_task_ids(self, job_id, domain_id):
952
944
return job_task_ids
953
945
954
946
def _get_data_source_account_map (
955
- self ,
956
- data_source_id : str ,
957
- domain_id : str ,
958
- workspace_id : str ,
959
- resource_group : str ,
947
+ self ,
948
+ data_source_id : str ,
949
+ domain_id : str ,
950
+ workspace_id : str ,
951
+ resource_group : str ,
960
952
) -> Dict [str , DataSourceAccount ]:
961
953
data_source_account_map = {}
962
954
conditions = {
@@ -978,11 +970,11 @@ def _get_data_source_account_map(
978
970
return data_source_account_map
979
971
980
972
def _get_linked_accounts_from_data_source_vo (
981
- self ,
982
- data_source_vo : DataSource ,
983
- options : dict ,
984
- secret_data : dict ,
985
- schema : dict = None ,
973
+ self ,
974
+ data_source_vo : DataSource ,
975
+ options : dict ,
976
+ secret_data : dict ,
977
+ schema : dict = None ,
986
978
) -> list :
987
979
linked_accounts = []
988
980
@@ -1044,6 +1036,14 @@ def _update_data_source_is_synced(self, job_vo: Job) -> None:
1044
1036
f"[_update_data_source_account_sync_status] synced_account_ids: { synced_accounts } / { data_source_id } { domain_id } "
1045
1037
)
1046
1038
1039
+ @staticmethod
1040
+ def _get_start_last_synchronized_at (params ):
1041
+ start = params .get ("start" )
1042
+ last_synchronized_at = utils .datetime_to_iso8601 (
1043
+ params .get ("last_synchronized_at" )
1044
+ )
1045
+ return start , last_synchronized_at
1046
+
1047
1047
@staticmethod
1048
1048
def _check_use_account_routing (data_source_vo : DataSource ) -> bool :
1049
1049
plugin_info = data_source_vo .plugin_info .to_dict () or {}
0 commit comments