Skip to content

fix: Fix error when virtual_machine is None in get_virtual_machine_name #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 37 additions & 27 deletions src/plugin/manager/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os
import abc
import logging
import datetime
import time
import logging
import os
import re
import time
from typing import Union

from spaceone.core.manager import BaseManager
from spaceone.core import utils
from spaceone.core.manager import BaseManager
from spaceone.inventory.plugin.collector.lib import *

_LOGGER = logging.getLogger("spaceone")
Expand Down Expand Up @@ -47,7 +47,10 @@ def list_managers_by_cloud_service_groups(cls, cloud_service_groups: list):
yield manager
elif cloud_service_groups:
for manager in cls.__subclasses__():
if manager.cloud_service_group and manager.cloud_service_group in cloud_service_groups:
if (
manager.cloud_service_group
and manager.cloud_service_group in cloud_service_groups
):
yield manager

@classmethod
Expand All @@ -62,21 +65,23 @@ def collect_metrics(cls, cloud_service_group: str):
if not os.path.exists(os.path.join(_METRIC_DIR, cloud_service_group)):
os.mkdir(os.path.join(_METRIC_DIR, cloud_service_group))
for dirname in os.listdir(os.path.join(_METRIC_DIR, cloud_service_group)):
for filename in os.listdir(os.path.join(_METRIC_DIR, cloud_service_group, dirname)):
for filename in os.listdir(
os.path.join(_METRIC_DIR, cloud_service_group, dirname)
):
if filename.endswith(".yaml"):
file_path = os.path.join(_METRIC_DIR, cloud_service_group, dirname, filename)
file_path = os.path.join(
_METRIC_DIR, cloud_service_group, dirname, filename
)
info = utils.load_yaml_from_file(file_path)
if filename == "namespace.yaml":
yield make_response(
namespace=info,
resource_type="inventory.Namespace",
match_keys=[]
match_keys=[],
)
else:
yield make_response(
metric=info,
resource_type="inventory.Metric",
match_keys=[]
metric=info, resource_type="inventory.Metric", match_keys=[]
)

def collect_resources(self, options: dict, secret_data: dict, schema: str):
Expand All @@ -89,17 +94,21 @@ def collect_resources(self, options: dict, secret_data: dict, schema: str):
try:
yield from self.collect_cloud_service_type()

cloud_services, total_count = self.collect_cloud_service(options, secret_data, schema
)
cloud_services, total_count = self.collect_cloud_service(
options, secret_data, schema
)
for cloud_service in cloud_services:
yield cloud_service
success_count, error_count = total_count

subscriptions_manager = AzureBaseManager.get_managers_by_cloud_service_group("SubscriptionsManager")
subscriptions_manager = (
AzureBaseManager.get_managers_by_cloud_service_group(
"SubscriptionsManager"
)
)
location_info = subscriptions_manager().list_location_info(secret_data)

yield from self.collect_region(location_info)
# yield from self.collect_region(secret_data)

except Exception as e:
yield make_error_response(
Expand Down Expand Up @@ -166,7 +175,7 @@ def collect_cloud_service(self, options: dict, secret_data: dict, schema: str):
"provider",
"cloud_service_type",
"cloud_service_group",
"account"
"account",
]
],
)
Expand All @@ -188,18 +197,22 @@ def get_metadata_path(self):
_cloud_service_group = self._camel_to_snake(self.cloud_service_group)
_cloud_service_type = self._camel_to_snake(self.cloud_service_type)

return os.path.join(_METADATA_DIR, _cloud_service_group, f"{_cloud_service_type}.yaml")
return os.path.join(
_METADATA_DIR, _cloud_service_group, f"{_cloud_service_type}.yaml"
)

def convert_nested_dictionary(self, cloud_svc_object: object) -> Union[object, dict]:
def convert_nested_dictionary(
self, cloud_svc_object: object
) -> Union[object, dict]:
cloud_svc_dict = {}
if hasattr(
cloud_svc_object, "__dict__"
cloud_svc_object, "__dict__"
): # if cloud_svc_object is not a dictionary type but has dict method
cloud_svc_dict = cloud_svc_object.__dict__
elif isinstance(cloud_svc_object, dict):
cloud_svc_dict = cloud_svc_object
elif not isinstance(
cloud_svc_object, list
cloud_svc_object, list
): # if cloud_svc_object is one of type like int, float, char, ...
return cloud_svc_object

Expand Down Expand Up @@ -232,15 +245,12 @@ def make_reference(resource_id: str, external_link_format: str = None) -> dict:
external_link = external_link_format.format(resource_id=resource_id)
else:
external_link = f"https://portal.azure.com/#@.onmicrosoft.com/resource{resource_id}/overview"
return {
"resource_id": resource_id,
"external_link": external_link
}
return {"resource_id": resource_id, "external_link": external_link}

@staticmethod
def _camel_to_snake(name):
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower()

@staticmethod
def get_resource_group_from_id(dict_id):
Expand All @@ -249,7 +259,7 @@ def get_resource_group_from_id(dict_id):

@staticmethod
def update_tenant_id_from_secret_data(
cloud_service_data: dict, secret_data: dict
cloud_service_data: dict, secret_data: dict
) -> dict:
if tenant_id := secret_data.get("tenant_id"):
cloud_service_data.update({"tenant_id": tenant_id})
Expand Down
62 changes: 40 additions & 22 deletions src/plugin/manager/network_security_groups/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from spaceone.inventory.plugin.collector.lib import *

from plugin.conf.cloud_service_conf import ICON_URL
from plugin.connector.sql_databases.sql_databases_connector import SqlDatabasesConnector
from plugin.connector.network_security_groups.network_security_groups_connector import NetworkSecurityGroupsConnector
from plugin.connector.subscriptions.subscriptions_connector import SubscriptionsConnector
from plugin.connector.network_security_groups.network_security_groups_connector import (
NetworkSecurityGroupsConnector,
)
from plugin.connector.subscriptions.subscriptions_connector import (
SubscriptionsConnector,
)
from plugin.manager.base import AzureBaseManager

_LOGGER = logging.getLogger(__name__)
Expand All @@ -26,22 +29,26 @@ def create_cloud_service_type(self):
is_primary=True,
is_major=True,
labels=["Networking"],
tags={
"spaceone:icon": f"{ICON_URL}/azure-network-security-groups.svg"
}
tags={"spaceone:icon": f"{ICON_URL}/azure-network-security-groups.svg"},
)

def create_cloud_service(self, options, secret_data, schema):
cloud_services = []
error_responses = []

network_security_groups_conn = NetworkSecurityGroupsConnector(secret_data=secret_data)
network_security_groups_conn = NetworkSecurityGroupsConnector(
secret_data=secret_data
)
subscription_conn = SubscriptionsConnector(secret_data=secret_data)

subscription_obj = subscription_conn.get_subscription(secret_data["subscription_id"])
subscription_obj = subscription_conn.get_subscription(
secret_data["subscription_id"]
)
subscription_info = self.convert_nested_dictionary(subscription_obj)

network_security_groups = network_security_groups_conn.list_all_network_security_groups()
network_security_groups = (
network_security_groups_conn.list_all_network_security_groups()
)
network_interfaces = [
self.convert_nested_dictionary(ni)
for ni in network_security_groups_conn.list_all_network_interfaces()
Expand All @@ -50,7 +57,9 @@ def create_cloud_service(self, options, secret_data, schema):
for network_security_group in network_security_groups:

try:
network_security_group_dict = self.convert_nested_dictionary(network_security_group)
network_security_group_dict = self.convert_nested_dictionary(
network_security_group
)
network_security_group_id = network_security_group_dict["id"]
inbound_rules = []
outbound_rules = []
Expand All @@ -67,8 +76,8 @@ def create_cloud_service(self, options, secret_data, schema):

# update default security rules
if (
network_security_group_dict.get("default_security_rules")
is not None
network_security_group_dict.get("default_security_rules")
is not None
):
inbound, outbound = self.split_security_rules(
network_security_group_dict, "default_security_rules"
Expand Down Expand Up @@ -158,13 +167,17 @@ def create_cloud_service(self, options, secret_data, schema):
data=network_security_group_dict,
account=secret_data["subscription_id"],
region_code=network_security_group_dict["location"],
reference=self.make_reference(network_security_group_dict.get("id")),
data_format="dict"
reference=self.make_reference(
network_security_group_dict.get("id")
),
data_format="dict",
)
)

except Exception as e:
_LOGGER.error(f"[create_cloud_service] Error {self.service} {e}", exc_info=True)
_LOGGER.error(
f"[create_cloud_service] Error {self.service} {e}", exc_info=True
)
error_responses.append(
make_error_response(
error=e,
Expand All @@ -177,7 +190,7 @@ def create_cloud_service(self, options, secret_data, schema):
return cloud_services, error_responses

def get_network_interfaces(
self, network_security_group_conn, network_interfaces_list
self, network_security_group_conn, network_interfaces_list
):
network_interfaces_new_list = []
virtual_machines_display_list = []
Expand Down Expand Up @@ -284,14 +297,19 @@ def get_virtual_network(subnet_id):
return virtual_network

@staticmethod
def get_virtual_machine_name(network_interfaces: list, network_security_group_id: str):
def get_virtual_machine_name(
network_interfaces: list, network_security_group_id: str
):
virtual_machine_name = None
for network_interface_info in network_interfaces:
if _network_security_group := network_interface_info.get("network_security_group"):
if (
_network_security_group["id"].split("/")[-1]
== network_security_group_id.split("/")[-1]
if _network_security_group := network_interface_info.get(
"network_security_group"
):
if _network_security_group["id"].split("/")[
-1
] == network_security_group_id.split("/")[-1] and (
_virtual_machine := network_interface_info.get("virtual_machine")
):
virtual_machine_name = network_interface_info["virtual_machine"]["id"].split("/")[-1]
virtual_machine_name = _virtual_machine["id"].split("/")[-1]
return virtual_machine_name
return virtual_machine_name
4 changes: 2 additions & 2 deletions src/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
f.close()

setup(
name="plugin-azure-inven-v2-collector-migration-test",
name="plugin-azure-inven-collector",
version=VERSION,
description="MS Azure cloud service inventory v2 collector",
description="MS Azure cloud service inventory collector",
long_description="",
url="https://cloudforet.io/",
author="Cloudforet Admin",
Expand Down
Loading