Skip to content

Cache CloudInfo / CloudSettings by authority #583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 21, 2025
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed

- CloudInfo / CloudSettings now cached by authority (schema, host and port) instead of full URL

## [5.0.3] - 2025-05-04

### Fixed
Expand Down
77 changes: 39 additions & 38 deletions azure-kusto-data/azure/kusto/data/_cloud_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,22 @@ class CloudSettings:
first_party_authority_url=DEFAULT_FIRST_PARTY_AUTHORITY_URL,
)

@classmethod
@distributed_trace(name_of_span="CloudSettings.get_cloud_info", kind=SpanKind.CLIENT)
def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, str]] = None) -> CloudInfo:
kusto_uri = cls._normalize_uri(kusto_uri)
# tracing attributes for cloud info
Span.set_cloud_info_attributes(kusto_uri)
if kusto_uri in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access
return cls._cloud_cache[kusto_uri]
with cls._cloud_cache_lock:
if kusto_uri in cls._cloud_cache:
return cls._cloud_cache[kusto_uri]
url_parts = urlparse(kusto_uri)
@classmethod
@distributed_trace(name_of_span="CloudSettings.get_cloud_info", kind=SpanKind.CLIENT)
def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, str]] = None) -> CloudInfo:
normalized_authority = cls._normalize_uri(kusto_uri)

# tracing attributes for cloud info
Span.set_cloud_info_attributes(kusto_uri)

if normalized_authority in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access
return cls._cloud_cache[normalized_authority]

with cls._cloud_cache_lock:
if normalized_authority in cls._cloud_cache:
return cls._cloud_cache[normalized_authority]

url_parts = urlparse(kusto_uri)
url = f"{url_parts.scheme}://{url_parts.netloc}/{METADATA_ENDPOINT}"

try:
Expand All @@ -87,31 +87,32 @@ def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str,
if content is None or content == {}:
raise KustoServiceError("Kusto returned an invalid cloud metadata response", result)
root = content["AzureAD"]
if root is not None:
cls._cloud_cache[kusto_uri] = CloudInfo(
login_endpoint=root["LoginEndpoint"],
login_mfa_required=root["LoginMfaRequired"],
kusto_client_app_id=root["KustoClientAppId"],
kusto_client_redirect_uri=root["KustoClientRedirectUri"],
kusto_service_resource_id=root["KustoServiceResourceId"],
first_party_authority_url=root["FirstPartyAuthorityUrl"],
)
else:
cls._cloud_cache[kusto_uri] = cls.DEFAULT_CLOUD
elif result.status_code == 404:
# For now as long not all proxies implement the metadata endpoint, if no endpoint exists return public cloud data
cls._cloud_cache[kusto_uri] = cls.DEFAULT_CLOUD
else:
raise KustoServiceError("Kusto returned an invalid cloud metadata response", result)
return cls._cloud_cache[kusto_uri]
if root is not None:
cls._cloud_cache[normalized_authority] = CloudInfo(
login_endpoint=root["LoginEndpoint"],
login_mfa_required=root["LoginMfaRequired"],
kusto_client_app_id=root["KustoClientAppId"],
kusto_client_redirect_uri=root["KustoClientRedirectUri"],
kusto_service_resource_id=root["KustoServiceResourceId"],
first_party_authority_url=root["FirstPartyAuthorityUrl"],
)
else:
cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD
elif result.status_code == 404:
# For now as long not all proxies implement the metadata endpoint, if no endpoint exists return public cloud data
cls._cloud_cache[normalized_authority] = cls.DEFAULT_CLOUD
else:
raise KustoServiceError("Kusto returned an invalid cloud metadata response", result)
return cls._cloud_cache[normalized_authority]

@classmethod
def add_to_cache(cls, url: str, cloud_info: CloudInfo):
with cls._cloud_cache_lock:
cls._cloud_cache[cls._normalize_uri(url)] = cloud_info

@classmethod
def _normalize_uri(cls, kusto_uri):
if not kusto_uri.endswith("/"):
kusto_uri += "/"
return kusto_uri
@classmethod
def _normalize_uri(cls, kusto_uri):
"""Extracts and returns the authority part of the URI (schema, host, port)"""
url_parts = urlparse(kusto_uri)
# Return only the scheme and netloc (which contains host and port if present)
return f"{url_parts.scheme}://{url_parts.netloc}"
105 changes: 105 additions & 0 deletions azure-kusto-data/tests/test_cloud_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import pytest

from azure.kusto.data._cloud_settings import CloudSettings, CloudInfo


@pytest.fixture
def clear_cache():
"""Fixture to clear the CloudSettings cache before each test"""
with CloudSettings._cloud_cache_lock:
CloudSettings._cloud_cache.clear()
yield
# Clean up after test if needed
with CloudSettings._cloud_cache_lock:
CloudSettings._cloud_cache.clear()


def test_normalize_uri_extracts_authority():
"""Test that _normalize_uri extracts only the authority part (schema, host, port) from a URI."""
# Test with various URI formats
test_cases = [
("https://cluster.kusto.windows.net", "https://cluster.kusto.windows.net"),
("https://cluster.kusto.windows.net/", "https://cluster.kusto.windows.net"),
("https://cluster.kusto.windows.net/v1/rest", "https://cluster.kusto.windows.net"),
("https://cluster.kusto.windows.net:443/v1/rest", "https://cluster.kusto.windows.net:443"),
("http://localhost:8080/v1/rest/query", "http://localhost:8080"),
("https://cluster.kusto.windows.net/database", "https://cluster.kusto.windows.net"),
]

for input_uri, expected_authority in test_cases:
assert CloudSettings._normalize_uri(input_uri) == expected_authority


def test_cloud_info_cached_by_authority(clear_cache):
"""Test that CloudInfo is cached by authority part of the URI (schema, host, port)."""
# Create a test CloudInfo object
test_cloud_info = CloudInfo(
login_endpoint="https://login.test.com",
login_mfa_required=False,
kusto_client_app_id="test-app-id",
kusto_client_redirect_uri="http://localhost/redirect",
kusto_service_resource_id="https://test.kusto.windows.net",
first_party_authority_url="https://login.test.com/tenant-id",
)

# Add to cache with a specific URL
base_url = "https://cluster.kusto.windows.net"
CloudSettings.add_to_cache(base_url, test_cloud_info)

# Test that it can be retrieved with different path variations but same authority
variations = [
base_url + "/",
base_url + "/database",
base_url + "/v1/rest/query",
base_url + "/some/other/path",
]

for url in variations:
# Use the internal _normalize_uri to get the cache key
normalized_url = CloudSettings._normalize_uri(url)
assert normalized_url == "https://cluster.kusto.windows.net"
assert normalized_url in CloudSettings._cloud_cache

# Verify the retrieved CloudInfo is the same instance
retrieved_info = CloudSettings._cloud_cache[normalized_url]
assert retrieved_info is test_cloud_info


def test_cloud_info_cached_with_port(clear_cache):
"""Test that URIs with ports are cached separately from those without."""
# Create two different CloudInfo objects
cloud_info_default = CloudInfo(
login_endpoint="https://login.default.com",
login_mfa_required=False,
kusto_client_app_id="default-app-id",
kusto_client_redirect_uri="http://localhost/redirect",
kusto_service_resource_id="https://default.kusto.windows.net",
first_party_authority_url="https://login.default.com/tenant-id",
)

cloud_info_with_port = CloudInfo(
login_endpoint="https://login.withport.com",
login_mfa_required=True,
kusto_client_app_id="port-app-id",
kusto_client_redirect_uri="http://localhost/redirect",
kusto_service_resource_id="https://port.kusto.windows.net",
first_party_authority_url="https://login.withport.com/tenant-id",
)

# Add both to cache with different authorities
CloudSettings.add_to_cache("https://cluster.kusto.windows.net", cloud_info_default)
CloudSettings.add_to_cache("https://cluster.kusto.windows.net:443", cloud_info_with_port)

# Verify they are cached separately
assert "https://cluster.kusto.windows.net" in CloudSettings._cloud_cache
assert "https://cluster.kusto.windows.net:443" in CloudSettings._cloud_cache

# Verify each URI gets the correct CloudInfo
assert CloudSettings._cloud_cache["https://cluster.kusto.windows.net"] is cloud_info_default
assert CloudSettings._cloud_cache["https://cluster.kusto.windows.net:443"] is cloud_info_with_port

# Additional verification with variations
assert CloudSettings._cloud_cache[CloudSettings._normalize_uri("https://cluster.kusto.windows.net/database")] is cloud_info_default
assert CloudSettings._cloud_cache[CloudSettings._normalize_uri("https://cluster.kusto.windows.net:443/database")] is cloud_info_with_port
Loading