@@ -782,6 +782,22 @@ def _delete_old_cost_data(self, data_source_id: str, domain_id: str):
782
782
)
783
783
monthly_cost_vos .delete ()
784
784
785
+ def _distinct_job_id (
786
+ self , query_filter : list , domain_id : str , data_source_id : str
787
+ ) -> list :
788
+ query = {
789
+ "distinct" : "job_id" ,
790
+ "filter" : query_filter ,
791
+ "target" : "PRIMARY" , # Execute a query to primary DB
792
+ }
793
+ _LOGGER .debug (f"[_distinct_job_id] query: { query } " )
794
+ response = self .cost_mgr .stat_costs (query , domain_id , data_source_id )
795
+ values = response .get ("results" , [])
796
+
797
+ _LOGGER .debug (f"[_distinct_job_id] job_ids: { values } " )
798
+
799
+ return values
800
+
785
801
def _delete_changed_cost_data (
786
802
self , job_vo : Job , start , end , change_filter , domain_id
787
803
):
@@ -790,7 +806,7 @@ def _delete_changed_cost_data(
790
806
{"k" : "billed_month" , "v" : start , "o" : "gte" },
791
807
{"k" : "data_source_id" , "v" : job_vo .data_source_id , "o" : "eq" },
792
808
{"k" : "domain_id" , "v" : job_vo .domain_id , "o" : "eq" },
793
- {"k" : "job_id" , "v" : job_vo .job_id , "o" : "not" },
809
+ # {"k": "job_id", "v": job_vo.job_id, "o": "not"},
794
810
],
795
811
"hint" : "COMPOUND_INDEX_FOR_SYNC_JOB_2" ,
796
812
}
@@ -801,24 +817,33 @@ def _delete_changed_cost_data(
801
817
for key , value in change_filter .items ():
802
818
query ["filter" ].append ({"k" : key , "v" : value , "o" : "eq" })
803
819
804
- _LOGGER .debug (f"[_delete_changed_cost_data] query: { query } " )
805
-
806
- cost_vos , total_count = self .cost_mgr .list_costs (
807
- copy .deepcopy (query ), domain_id , job_vo .data_source_id
808
- )
809
- cost_vos .delete ()
810
- _LOGGER .debug (
811
- f"[_delete_changed_cost_data] delete costs (count = { total_count } )"
820
+ job_ids = self ._distinct_job_id (
821
+ query ["filter" ], domain_id , job_vo .data_source_id
812
822
)
823
+ for job_id in job_ids :
824
+ if job_vo .job_id == job_id :
825
+ continue
813
826
814
- query ["hint" ] = "COMPOUND_INDEX_FOR_SYNC_JOB"
815
- monthly_cost_vos , total_count = self .cost_mgr .list_monthly_costs (
816
- copy .deepcopy (query ), domain_id
817
- )
818
- monthly_cost_vos .delete ()
819
- _LOGGER .debug (
820
- f"[_delete_changed_cost_data] delete monthly costs (count = { total_count } )"
821
- )
827
+ query ["filter" ].append ({"k" : "job_id" , "v" : job_id , "o" : "eq" })
828
+
829
+ _LOGGER .debug (f"[_delete_changed_cost_data] query: { query } " )
830
+
831
+ cost_vos , total_count = self .cost_mgr .list_costs (
832
+ copy .deepcopy (query ), domain_id , job_vo .data_source_id
833
+ )
834
+ cost_vos .delete ()
835
+ _LOGGER .debug (
836
+ f"[_delete_changed_cost_data] delete costs (count = { total_count } )"
837
+ )
838
+
839
+ query ["hint" ] = "COMPOUND_INDEX_FOR_SYNC_JOB"
840
+ monthly_cost_vos , total_count = self .cost_mgr .list_monthly_costs (
841
+ copy .deepcopy (query ), domain_id
842
+ )
843
+ monthly_cost_vos .delete ()
844
+ _LOGGER .debug (
845
+ f"[_delete_changed_cost_data] delete monthly costs (count = { total_count } )"
846
+ )
822
847
823
848
def _aggregate_cost_data (
824
849
self , job_vo : Job , data_keys : list , additional_info_keys : list , tag_keys : list
0 commit comments