|
1 | 1 | import logging
|
2 | 2 | import os
|
| 3 | +import tempfile |
3 | 4 | import time
|
4 | 5 | import requests
|
5 | 6 | import pandas as pd
|
6 | 7 | import numpy as np
|
7 | 8 | from datetime import datetime
|
8 | 9 | from functools import wraps
|
9 |
| -from io import StringIO |
| 10 | +from io import BytesIO |
10 | 11 | from typing import get_type_hints, Union, Any
|
11 | 12 |
|
12 | 13 | from azure.identity import DefaultAzureCredential
|
@@ -130,7 +131,7 @@ def check_reservation_transaction(self) -> bool:
|
130 | 131 | return True
|
131 | 132 |
|
132 | 133 | def list_reservation_transactions_by_billing_profile_id(
|
133 |
| - self, query_filter: str |
| 134 | + self, query_filter: str |
134 | 135 | ) -> list:
|
135 | 136 | transactions = []
|
136 | 137 | try:
|
@@ -165,12 +166,12 @@ def list_billing_accounts(self) -> list:
|
165 | 166 | return billing_accounts_info
|
166 | 167 |
|
167 | 168 | def query_usage_http(
|
168 |
| - self, |
169 |
| - secret_data: dict, |
170 |
| - start: datetime, |
171 |
| - end: datetime, |
172 |
| - account_agreement_type: str, |
173 |
| - options=None, |
| 169 | + self, |
| 170 | + secret_data: dict, |
| 171 | + start: datetime, |
| 172 | + end: datetime, |
| 173 | + account_agreement_type: str, |
| 174 | + options=None, |
174 | 175 | ):
|
175 | 176 | try:
|
176 | 177 | billing_account_id = secret_data["billing_account_id"]
|
@@ -250,25 +251,24 @@ def begin_create_operation(self, scope: str, parameters: dict) -> list:
|
250 | 251 |
|
251 | 252 | def get_cost_data(self, blobs: list, options: dict) -> list:
|
252 | 253 | _LOGGER.debug(f"[get_cost_data] options: {options}")
|
253 |
| - |
254 | 254 | total_cost_count = 0
|
255 | 255 | for blob in blobs:
|
256 |
| - response = self._download_cost_data(blob) |
| 256 | + with tempfile.TemporaryFile() as temp_file: |
| 257 | + self._download_cost_data(blob, temp_file) |
257 | 258 |
|
258 |
| - df_chunk = pd.read_csv( |
259 |
| - StringIO(response.text), |
260 |
| - low_memory=False, |
261 |
| - chunksize=_PAGE_SIZE, |
262 |
| - ) |
| 259 | + df_chunk = pd.read_csv( |
| 260 | + BytesIO(temp_file.read()), |
| 261 | + low_memory=False, |
| 262 | + chunksize=_PAGE_SIZE, |
| 263 | + ) |
263 | 264 |
|
264 |
| - for df in df_chunk: |
265 |
| - df = df.replace({np.nan: None}) |
| 265 | + for df in df_chunk: |
| 266 | + df = df.replace({np.nan: None}) |
266 | 267 |
|
267 |
| - costs_data = df.to_dict("records") |
268 |
| - total_cost_count += len(costs_data) |
269 |
| - yield costs_data |
270 |
| - del df_chunk |
271 |
| - del response |
| 268 | + costs_data = df.to_dict("records") |
| 269 | + total_cost_count += len(costs_data) |
| 270 | + yield costs_data |
| 271 | + del df_chunk |
272 | 272 | _LOGGER.debug(f"[get_cost_data] total_cost_count: {total_cost_count}")
|
273 | 273 |
|
274 | 274 | def list_by_billing_account(self):
|
@@ -300,13 +300,13 @@ def _make_request_headers(self, client_type=None):
|
300 | 300 | def convert_nested_dictionary(self, cloud_svc_object):
|
301 | 301 | cloud_svc_dict = {}
|
302 | 302 | if hasattr(
|
303 |
| - cloud_svc_object, "__dict__" |
| 303 | + cloud_svc_object, "__dict__" |
304 | 304 | ): # if cloud_svc_object is not a dictionary type but has dict method
|
305 | 305 | cloud_svc_dict = cloud_svc_object.__dict__
|
306 | 306 | elif isinstance(cloud_svc_object, dict):
|
307 | 307 | cloud_svc_dict = cloud_svc_object
|
308 | 308 | elif not isinstance(
|
309 |
| - cloud_svc_object, list |
| 309 | + cloud_svc_object, list |
310 | 310 | ): # if cloud_svc_object is one of type like int, float, char, ...
|
311 | 311 | return cloud_svc_object
|
312 | 312 |
|
@@ -356,13 +356,15 @@ def _retry_request(self, response, url, headers, json, retry_count, method="post
|
356 | 356 | raise e
|
357 | 357 |
|
358 | 358 | @staticmethod
|
359 |
| - def _download_cost_data(blob: dict) -> requests.Response: |
| 359 | + def _download_cost_data(blob: dict, temp_file) -> None: |
360 | 360 | try:
|
361 |
| - response = requests.get(blob.get("blob_link")) |
362 |
| - if response.status_code != 200: |
363 |
| - raise ERROR_CONNECTOR_CALL_API(reason=f"{response.reason}") |
364 |
| - _LOGGER.debug(f"[_download_cost_data] response: {response}") |
365 |
| - return response |
| 361 | + with requests.get(blob.get("blob_link"), stream=True) as response: |
| 362 | + response.raise_for_status() |
| 363 | + |
| 364 | + for chunk in response.iter_content(chunk_size=_PAGE_SIZE): |
| 365 | + temp_file.write(chunk) |
| 366 | + temp_file.seek(0) |
| 367 | + |
366 | 368 | except Exception as e:
|
367 | 369 | _LOGGER.error(f"[_download_cost_data] download error: {e}", exc_info=True)
|
368 | 370 | raise e
|
@@ -392,8 +394,8 @@ def _get_access_token():
|
392 | 394 | @staticmethod
|
393 | 395 | def _check_secret_data(secret_data):
|
394 | 396 | if (
|
395 |
| - "billing_account_id" not in secret_data |
396 |
| - and "subscription_id" not in secret_data |
| 397 | + "billing_account_id" not in secret_data |
| 398 | + and "subscription_id" not in secret_data |
397 | 399 | ):
|
398 | 400 | raise ERROR_REQUIRED_PARAMETER(
|
399 | 401 | key="secret_data.billing_account_id or secret_data.subscription_id"
|
|
0 commit comments