Skip to content

[DO NOT MERGE] Cosmos fabric native integration testing #40372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sdk/cosmos/azure-cosmos/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.

import fabric_token_credential
import test_config
from azure.cosmos import CosmosClient as CosmosSyncClient

cosmos_sync_client = CosmosSyncClient(test_config.TestConfig.host, test_config.TestConfig.masterKey)
credential = fabric_token_credential.FabricTokenCredential()
cosmos_sync_client = CosmosSyncClient(test_config.TestConfig.fabric_host, credential=credential)


def pytest_configure(config):
Expand Down Expand Up @@ -32,7 +34,7 @@ def pytest_sessionfinish(session, exitstatus):
returning the exit status to the system.
"""
config = test_config.TestConfig
config.try_delete_database(cosmos_sync_client)
# config.try_delete_database(cosmos_sync_client)


def pytest_unconfigure(config):
Expand Down
20 changes: 20 additions & 0 deletions sdk/cosmos/azure-cosmos/tests/fabric_token_credential.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
from typing import Optional, Any

from azure.core.credentials import TokenCredential, AccessToken
from azure.identity import InteractiveBrowserCredential


class FabricTokenCredential(TokenCredential):

def __init__(self):
self.token_credential = InteractiveBrowserCredential()
self.token_credential.authority = ''

def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
enable_cae: bool = False, **kwargs: Any) -> AccessToken:
scopes = ["https://cosmos.azure.com/.default"]
return self.token_credential.get_token(*scopes, claims=claims, tenant_id=tenant_id, enable_cae=enable_cae,
**kwargs)
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class TestConfig(object):
masterKey = os.getenv('ACCOUNT_KEY',
'C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==')
host = os.getenv('ACCOUNT_HOST', local_host)
fabric_host = ""
connection_str = os.getenv('ACCOUNT_CONNECTION_STR', 'AccountEndpoint={};AccountKey={};'.format(host, masterKey))

connectionPolicy = documents.ConnectionPolicy()
Expand All @@ -54,7 +55,7 @@ class TestConfig(object):
THROUGHPUT_FOR_5_PARTITIONS = 30000
THROUGHPUT_FOR_1_PARTITION = 400

TEST_DATABASE_ID = os.getenv('COSMOS_TEST_DATABASE_ID', "Python SDK Test Database " + str(uuid.uuid4()))
TEST_DATABASE_ID = os.getenv('COSMOS_TEST_DATABASE_ID', 'dkunda-fabric-cdb')

TEST_SINGLE_PARTITION_CONTAINER_ID = "Single Partition Test Container " + str(uuid.uuid4())
TEST_MULTI_PARTITION_CONTAINER_ID = "Multi Partition Test Container " + str(uuid.uuid4())
Expand Down
253 changes: 253 additions & 0 deletions sdk/cosmos/azure-cosmos/tests/test_fabric_change_feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.

import unittest
import uuid
from datetime import datetime, timedelta, timezone
from time import sleep

import pytest
from _pytest.outcomes import fail

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
import test_config
from azure.cosmos.partition_key import PartitionKey
from tests import fabric_token_credential


@pytest.fixture(scope="class")
def setup():
config = test_config.TestConfig()
credential = fabric_token_credential.FabricTokenCredential()
test_client = cosmos_client.CosmosClient(config.fabric_host, credential=credential),
return {
"created_db": test_client[0].get_database_client(config.TEST_DATABASE_ID),
"is_emulator": config.is_emulator
}

def round_time():
utc_now = datetime.now(timezone.utc)
return utc_now - timedelta(microseconds=utc_now.microsecond)

@pytest.mark.cosmosFabric
@pytest.mark.unittest
@pytest.mark.usefixtures("setup")
class TestChangeFeed:
"""Test to ensure escaping of non-ascii characters from partition key"""

def test_get_feed_ranges(self, setup):
created_collection = setup["created_db"].create_container("get_feed_ranges_" + str(uuid.uuid4()),
PartitionKey(path="/pk"))
result = list(created_collection.read_feed_ranges())
assert len(result) == 1

@pytest.mark.parametrize("change_feed_filter_param", ["partitionKey", "partitionKeyRangeId", "feedRange"])
# @pytest.mark.parametrize("change_feed_filter_param", ["partitionKeyRangeId"])
def test_query_change_feed_with_different_filter(self, change_feed_filter_param, setup):
created_collection = setup["created_db"].create_container(f"change_feed_test_{change_feed_filter_param}_{str(uuid.uuid4())}",
PartitionKey(path="/pk"))
# Read change feed without passing any options
query_iterable = created_collection.query_items_change_feed()
iter_list = list(query_iterable)
assert len(iter_list) == 0

if change_feed_filter_param == "partitionKey":
filter_param = {"partition_key": "pk"}
elif change_feed_filter_param == "partitionKeyRangeId":
filter_param = {"partition_key_range_id": "0"}
elif change_feed_filter_param == "feedRange":
feed_ranges = list(created_collection.read_feed_ranges())
assert len(feed_ranges) == 1
filter_param = {"feed_range": feed_ranges[0]}
else:
filter_param = None

# Read change feed from current should return an empty list
query_iterable = created_collection.query_items_change_feed(**filter_param)
iter_list = list(query_iterable)
assert len(iter_list) == 0
assert 'etag' in created_collection.client_connection.last_response_headers
assert created_collection.client_connection.last_response_headers['etag'] !=''

# Read change feed from beginning should return an empty list
query_iterable = created_collection.query_items_change_feed(
is_start_from_beginning=True,
**filter_param
)
iter_list = list(query_iterable)
assert len(iter_list) == 0
assert 'etag' in created_collection.client_connection.last_response_headers
continuation1 = created_collection.client_connection.last_response_headers['etag']
assert continuation1 != ''

# Create a document. Read change feed should return be able to read that document
document_definition = {'pk': 'pk', 'id': 'doc1'}
created_collection.create_item(body=document_definition)
query_iterable = created_collection.query_items_change_feed(
is_start_from_beginning=True,
**filter_param
)
iter_list = list(query_iterable)
assert len(iter_list) == 1
assert iter_list[0]['id'] == 'doc1'
assert 'etag' in created_collection.client_connection.last_response_headers
continuation2 = created_collection.client_connection.last_response_headers['etag']
assert continuation2 != ''
assert continuation2 != continuation1

# Create two new documents. Verify that change feed contains the 2 new documents
# with page size 1 and page size 100
document_definition = {'pk': 'pk', 'id': 'doc2'}
created_collection.create_item(body=document_definition)
document_definition = {'pk': 'pk3', 'id': 'doc3'}
created_collection.create_item(body=document_definition)

for pageSize in [1, 100]:
# verify iterator
query_iterable = created_collection.query_items_change_feed(
continuation=continuation2,
max_item_count=pageSize,
**filter_param
)
it = query_iterable.__iter__()
expected_ids = 'doc2.doc3.'
if "partition_key" in filter_param:
expected_ids = 'doc2.'
actual_ids = ''
for item in it:
actual_ids += item['id'] + '.'
assert actual_ids == expected_ids

# verify by_page
# the options is not copied, therefore it need to be restored
query_iterable = created_collection.query_items_change_feed(
continuation=continuation2,
max_item_count=pageSize,
**filter_param
)
count = 0
expected_count = 2
if "partition_key" in filter_param:
expected_count = 1
all_fetched_res = []
for page in query_iterable.by_page():
fetched_res = list(page)
assert len(fetched_res) == min(pageSize, expected_count - count)
count += len(fetched_res)
all_fetched_res.extend(fetched_res)

actual_ids = ''
for item in all_fetched_res:
actual_ids += item['id'] + '.'
assert actual_ids == expected_ids

# verify reading change feed from the beginning
query_iterable = created_collection.query_items_change_feed(
is_start_from_beginning=True,
**filter_param
)
expected_ids = 'doc1.doc2.doc3.'
if "partition_key" in filter_param:
expected_ids = 'doc1.doc2.'
it = query_iterable.__iter__()
actual_ids = ''
for item in it:
actual_ids += item['id'] + '.'
assert actual_ids == expected_ids
assert 'etag' in created_collection.client_connection.last_response_headers
continuation3 = created_collection.client_connection.last_response_headers['etag']

# verify reading empty change feed
query_iterable = created_collection.query_items_change_feed(
continuation=continuation3,
is_start_from_beginning=True,
**filter_param
)
iter_list = list(query_iterable)
assert len(iter_list) == 0
setup["created_db"].delete_container(created_collection.id)

def test_query_change_feed_with_start_time(self, setup):
created_collection = setup["created_db"].create_container_if_not_exists("query_change_feed_start_time_test",
PartitionKey(path="/pk"))
batchSize = 50

def create_random_items(container, batch_size):
for _ in range(batch_size):
# Generate a Random partition key
partition_key = 'pk' + str(uuid.uuid4())

# Generate a random item
item = {
'id': 'item' + str(uuid.uuid4()),
'partitionKey': partition_key,
'content': 'This is some random content',
}

try:
# Create the item in the container
container.upsert_item(item)
except exceptions.CosmosHttpResponseError as e:
fail(e)

# Create first batch of random items
create_random_items(created_collection, batchSize)

# wait for 1 second and record the time, then wait another second
sleep(1)
start_time = round_time()
not_utc_time = datetime.now()
sleep(1)

# now create another batch of items
create_random_items(created_collection, batchSize)

# now query change feed based on start time
change_feed_iter = list(created_collection.query_items_change_feed(start_time=start_time))
totalCount = len(change_feed_iter)

# now check if the number of items that were changed match the batch size
assert totalCount == batchSize

# negative test: pass in a valid time in the future
future_time = start_time + timedelta(hours=1)
change_feed_iter = list(created_collection.query_items_change_feed(start_time=future_time))
totalCount = len(change_feed_iter)
# A future time should return 0
assert totalCount == 0

# test a date that is not utc, will be converted to utc by sdk
change_feed_iter = list(created_collection.query_items_change_feed(start_time=not_utc_time))
totalCount = len(change_feed_iter)
# Should equal batch size
assert totalCount == batchSize

setup["created_db"].delete_container(created_collection.id)

def test_query_change_feed_with_multi_partition(self, setup):
created_collection = setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()),
PartitionKey(path="/pk"),
offer_throughput=11000)

# create one doc and make sure change feed query can return the document
new_documents = [
{'pk': 'pk', 'id': 'doc1'},
{'pk': 'pk2', 'id': 'doc2'},
{'pk': 'pk3', 'id': 'doc3'},
{'pk': 'pk4', 'id': 'doc4'}]
expected_ids = ['doc1', 'doc2', 'doc3', 'doc4']

for document in new_documents:
created_collection.create_item(body=document)

query_iterable = created_collection.query_items_change_feed(start_time="Beginning")
it = query_iterable.__iter__()
actual_ids = []
for item in it:
actual_ids.append(item['id'])

assert actual_ids == expected_ids

if __name__ == "__main__":
unittest.main()
Loading
Loading