forked from scylladb/alternator-load-balancing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_integration_boto3.py
executable file
·144 lines (120 loc) · 5.3 KB
/
test_integration_boto3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from unittest.mock import patch
import urllib3.connection
from alternator_lb import AlternatorLB, Config
class TestAlternatorBotocore:
initial_nodes = ['172.43.0.2']
http_port = 9998
https_port = 9999
def test_check_if_rack_datacenter_feature_is_supported(self):
lb = AlternatorLB(Config(nodes=self.initial_nodes,
port=self.http_port, datacenter="fake_dc"))
lb.check_if_rack_datacenter_feature_is_supported()
def test_check_if_rack_and_datacenter_set_correctly_wrong_dc(self):
lb = AlternatorLB(Config(nodes=self.initial_nodes,
port=self.http_port, datacenter="fake_dc"))
try:
lb.check_if_rack_and_datacenter_set_correctly()
assert False, "Expected ValueError"
except ValueError:
pass
def test_http_connection_persistent(self):
self._test_connection_persistent("http")
def test_https_connection_persistent(self):
self._test_connection_persistent("https")
def _test_connection_persistent(self, schema: str):
cnt = 0
if schema == "http":
original_init = urllib3.connection.HTTPConnection.__init__
else:
original_init = urllib3.connection.HTTPSConnection.__init__
def wrapper(self, *args, **kwargs):
nonlocal cnt
nonlocal original_init
print(f'Wrapper: args={args}, kwargs={kwargs}')
cnt += 1
return original_init(self, *args, **kwargs)
if schema == "http":
patched = patch.object(
urllib3.connection.HTTPConnection, '__init__', new=wrapper)
else:
patched = patch.object(
urllib3.connection.HTTPSConnection, '__init__', new=wrapper)
with patched:
lb = AlternatorLB(Config(
schema=schema,
nodes=self.initial_nodes,
port=self.http_port if schema == "http" else self.https_port,
datacenter="fake_dc",
update_interval=0,
))
dynamodb = lb.new_boto3_dynamodb_client()
try:
dynamodb.delete_table(TableName="FakeTable")
except Exception as e:
if e.__class__.__name__ != "ResourceNotFoundException":
raise
assert cnt == 1
try:
dynamodb.delete_table(TableName="FakeTable")
except Exception as e:
if e.__class__.__name__ != "ResourceNotFoundException":
raise
assert cnt == 1 # Connection should be carried over to another request
lb._update_live_nodes()
assert cnt == 2 # AlternatorLB uses different connection pool, so one more connection will be created
lb._update_live_nodes()
assert cnt == 2 # And it should be carried over to another attempt of pulling nodes
def test_check_if_rack_and_datacenter_set_correctly_correct_dc(self):
lb = AlternatorLB(Config(nodes=self.initial_nodes,
port=self.http_port, datacenter="datacenter1"))
lb.check_if_rack_and_datacenter_set_correctly()
@staticmethod
def _run_create_add_delete_test(dynamodb):
TABLE_NAME = "TestTable"
ITEM_KEY = {'UserID': {'S': '123'}}
try:
dynamodb.delete_table(TableName=TABLE_NAME)
except Exception as e:
if e.__class__.__name__ != "ResourceNotFoundException":
raise
print("Creating table...")
dynamodb.create_table(
TableName=TABLE_NAME,
KeySchema=[{'AttributeName': 'UserID',
'KeyType': 'HASH'}], # Primary Key
AttributeDefinitions=[
{'AttributeName': 'UserID', 'AttributeType': 'S'}], # String Key
ProvisionedThroughput={
'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5}
)
print(f"Table '{TABLE_NAME}' creation started.")
# 2️⃣ Add an Item
print("Adding item to the table...")
dynamodb.put_item(
TableName=TABLE_NAME,
Item={
'UserID': {'S': '123'},
'Name': {'S': 'Alice'},
'Age': {'N': '25'}
}
)
print("Item added.")
# 3️⃣ Get the Item
print("Retrieving item...")
response = dynamodb.get_item(TableName=TABLE_NAME, Key=ITEM_KEY)
if 'Item' in response:
print("Retrieved Item:", response['Item'])
else:
print("Item not found.")
# 4️⃣ Delete the Item
print("Deleting item...")
dynamodb.delete_item(TableName=TABLE_NAME, Key=ITEM_KEY)
def test_botocore_create_add_delete(self):
lb = AlternatorLB(Config(nodes=self.initial_nodes,
port=self.http_port, datacenter="datacenter1"))
self._run_create_add_delete_test(lb.new_botocore_dynamodb_client())
def test_boto3_create_add_delete(self):
lb = AlternatorLB(Config(nodes=self.initial_nodes,
port=self.http_port, datacenter="datacenter1"))
lb = AlternatorLB(Config(nodes=self.initial_nodes, port=self.http_port, datacenter="datacenter1"))
self._run_create_add_delete_test(lb.new_boto3_dynamodb_client())