1
- import os
2
1
import abc
3
- import logging
4
2
import datetime
5
- import time
3
+ import logging
4
+ import os
6
5
import re
6
+ import time
7
7
from typing import Union
8
8
9
- from spaceone .core .manager import BaseManager
10
9
from spaceone .core import utils
10
+ from spaceone .core .manager import BaseManager
11
11
from spaceone .inventory .plugin .collector .lib import *
12
12
13
13
_LOGGER = logging .getLogger ("spaceone" )
@@ -47,7 +47,10 @@ def list_managers_by_cloud_service_groups(cls, cloud_service_groups: list):
47
47
yield manager
48
48
elif cloud_service_groups :
49
49
for manager in cls .__subclasses__ ():
50
- if manager .cloud_service_group and manager .cloud_service_group in cloud_service_groups :
50
+ if (
51
+ manager .cloud_service_group
52
+ and manager .cloud_service_group in cloud_service_groups
53
+ ):
51
54
yield manager
52
55
53
56
@classmethod
@@ -62,21 +65,23 @@ def collect_metrics(cls, cloud_service_group: str):
62
65
if not os .path .exists (os .path .join (_METRIC_DIR , cloud_service_group )):
63
66
os .mkdir (os .path .join (_METRIC_DIR , cloud_service_group ))
64
67
for dirname in os .listdir (os .path .join (_METRIC_DIR , cloud_service_group )):
65
- for filename in os .listdir (os .path .join (_METRIC_DIR , cloud_service_group , dirname )):
68
+ for filename in os .listdir (
69
+ os .path .join (_METRIC_DIR , cloud_service_group , dirname )
70
+ ):
66
71
if filename .endswith (".yaml" ):
67
- file_path = os .path .join (_METRIC_DIR , cloud_service_group , dirname , filename )
72
+ file_path = os .path .join (
73
+ _METRIC_DIR , cloud_service_group , dirname , filename
74
+ )
68
75
info = utils .load_yaml_from_file (file_path )
69
76
if filename == "namespace.yaml" :
70
77
yield make_response (
71
78
namespace = info ,
72
79
resource_type = "inventory.Namespace" ,
73
- match_keys = []
80
+ match_keys = [],
74
81
)
75
82
else :
76
83
yield make_response (
77
- metric = info ,
78
- resource_type = "inventory.Metric" ,
79
- match_keys = []
84
+ metric = info , resource_type = "inventory.Metric" , match_keys = []
80
85
)
81
86
82
87
def collect_resources (self , options : dict , secret_data : dict , schema : str ):
@@ -89,17 +94,21 @@ def collect_resources(self, options: dict, secret_data: dict, schema: str):
89
94
try :
90
95
yield from self .collect_cloud_service_type ()
91
96
92
- cloud_services , total_count = self .collect_cloud_service (options , secret_data , schema
93
- )
97
+ cloud_services , total_count = self .collect_cloud_service (
98
+ options , secret_data , schema
99
+ )
94
100
for cloud_service in cloud_services :
95
101
yield cloud_service
96
102
success_count , error_count = total_count
97
103
98
- subscriptions_manager = AzureBaseManager .get_managers_by_cloud_service_group ("SubscriptionsManager" )
104
+ subscriptions_manager = (
105
+ AzureBaseManager .get_managers_by_cloud_service_group (
106
+ "SubscriptionsManager"
107
+ )
108
+ )
99
109
location_info = subscriptions_manager ().list_location_info (secret_data )
100
110
101
111
yield from self .collect_region (location_info )
102
- # yield from self.collect_region(secret_data)
103
112
104
113
except Exception as e :
105
114
yield make_error_response (
@@ -166,7 +175,7 @@ def collect_cloud_service(self, options: dict, secret_data: dict, schema: str):
166
175
"provider" ,
167
176
"cloud_service_type" ,
168
177
"cloud_service_group" ,
169
- "account"
178
+ "account" ,
170
179
]
171
180
],
172
181
)
@@ -188,18 +197,22 @@ def get_metadata_path(self):
188
197
_cloud_service_group = self ._camel_to_snake (self .cloud_service_group )
189
198
_cloud_service_type = self ._camel_to_snake (self .cloud_service_type )
190
199
191
- return os .path .join (_METADATA_DIR , _cloud_service_group , f"{ _cloud_service_type } .yaml" )
200
+ return os .path .join (
201
+ _METADATA_DIR , _cloud_service_group , f"{ _cloud_service_type } .yaml"
202
+ )
192
203
193
- def convert_nested_dictionary (self , cloud_svc_object : object ) -> Union [object , dict ]:
204
+ def convert_nested_dictionary (
205
+ self , cloud_svc_object : object
206
+ ) -> Union [object , dict ]:
194
207
cloud_svc_dict = {}
195
208
if hasattr (
196
- cloud_svc_object , "__dict__"
209
+ cloud_svc_object , "__dict__"
197
210
): # if cloud_svc_object is not a dictionary type but has dict method
198
211
cloud_svc_dict = cloud_svc_object .__dict__
199
212
elif isinstance (cloud_svc_object , dict ):
200
213
cloud_svc_dict = cloud_svc_object
201
214
elif not isinstance (
202
- cloud_svc_object , list
215
+ cloud_svc_object , list
203
216
): # if cloud_svc_object is one of type like int, float, char, ...
204
217
return cloud_svc_object
205
218
@@ -232,15 +245,12 @@ def make_reference(resource_id: str, external_link_format: str = None) -> dict:
232
245
external_link = external_link_format .format (resource_id = resource_id )
233
246
else :
234
247
external_link = f"https://portal.azure.com/#@.onmicrosoft.com/resource{ resource_id } /overview"
235
- return {
236
- "resource_id" : resource_id ,
237
- "external_link" : external_link
238
- }
248
+ return {"resource_id" : resource_id , "external_link" : external_link }
239
249
240
250
@staticmethod
241
251
def _camel_to_snake (name ):
242
- name = re .sub (' (.)([A-Z][a-z]+)' , r' \1_\2' , name )
243
- return re .sub (' ([a-z0-9])([A-Z])' , r' \1_\2' , name ).lower ()
252
+ name = re .sub (" (.)([A-Z][a-z]+)" , r" \1_\2" , name )
253
+ return re .sub (" ([a-z0-9])([A-Z])" , r" \1_\2" , name ).lower ()
244
254
245
255
@staticmethod
246
256
def get_resource_group_from_id (dict_id ):
@@ -249,7 +259,7 @@ def get_resource_group_from_id(dict_id):
249
259
250
260
@staticmethod
251
261
def update_tenant_id_from_secret_data (
252
- cloud_service_data : dict , secret_data : dict
262
+ cloud_service_data : dict , secret_data : dict
253
263
) -> dict :
254
264
if tenant_id := secret_data .get ("tenant_id" ):
255
265
cloud_service_data .update ({"tenant_id" : tenant_id })
0 commit comments