@@ -869,16 +869,20 @@ def _aggregate_monthly_cost_data(
869
869
{"k" : "job_task_id" , "v" : job_task_id , "o" : "eq" },
870
870
],
871
871
"allow_disk_use" : True , # Allow disk use for large data
872
+ "return_type" : "cursor" , # Return type is cursor
872
873
}
873
874
874
875
for data_key in data_keys :
875
876
query ["fields" ].update (
876
877
{f"data_{ data_key } " : {"key" : f"data.{ data_key } " , "operator" : "sum" }}
877
878
)
878
879
879
- response = self .cost_mgr .analyze_costs (query , domain_id , target = "PRIMARY" )
880
- results = response .get ("results" , [])
881
- for aggregated_cost_data in results :
880
+ # response = self.cost_mgr.analyze_costs(query, domain_id, target="PRIMARY")
881
+ # results = response.get("results", [])
882
+ cursor = self .cost_mgr .analyze_costs (query , domain_id , target = "PRIMARY" )
883
+
884
+ row_count = 0
885
+ for aggregated_cost_data in cursor :
882
886
aggregated_cost_data ["data_source_id" ] = data_source_id
883
887
aggregated_cost_data ["billed_month" ] = billed_month
884
888
aggregated_cost_data ["job_id" ] = job_id
@@ -891,9 +895,10 @@ def _aggregate_monthly_cost_data(
891
895
f"data_{ data_key } " , None
892
896
)
893
897
self .cost_mgr .create_monthly_cost (aggregated_cost_data )
898
+ row_count += 1
894
899
895
900
_LOGGER .debug (
896
- f"[_aggregate_monthly_cost_data] create monthly costs ({ billed_month } ): { job_id } (count = { len ( results ) } )"
901
+ f"[_aggregate_monthly_cost_data] create monthly costs ({ billed_month } ): { job_id } (count = { row_count } )"
897
902
)
898
903
899
904
def _get_all_data_sources (self ):
0 commit comments