Skip to content

Commit d03faf5

Browse files
committed
add on_disconnected_event
1 parent 4435aa5 commit d03faf5

14 files changed

Lines changed: 87 additions & 120 deletions

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_auto_connect.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def test_auto_connect(self, webpubsubclient_endpoint):
2525
reconnect_retry_backoff_factor=0.1,
2626
)
2727
name = "test_auto_connect"
28-
connected_event, message_event = self.setup_events(client)
28+
connected_event, _, message_event = self.setup_events(client)
2929
with client:
3030
assert connected_event.wait(timeout=30), "Timed out waiting for initial connection"
3131
conn_id0 = client._connection_id

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_auto_connect_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async def test_auto_connect_async(self, webpubsubclient_endpoint):
2626
reconnect_retry_backoff_factor=0.1,
2727
)
2828
name = "test_auto_connect_async"
29-
connected_event, message_event = await self.setup_events(client)
29+
connected_event, _, message_event = await self.setup_events(client)
3030
async with client:
3131
await asyncio.wait_for(connected_event.wait(), timeout=30)
3232
conn_id0 = client._connection_id

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_no_recovery_no_connect.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,10 @@ def test_disable_recovery_and_autoconnect(self, webpubsubclient_endpoint):
3434
name = "test_disable_recovery_and_autoconnect"
3535
with client:
3636
group_name = name
37-
client.subscribe("group-message", on_group_message)
37+
_, disconnected_event, _ = self.setup_events(client)
3838
client.join_group(group_name)
3939
client._ws.sock.close(1001) # close connection
40-
# wait for client to detect disconnection so send raises SendMessageError
41-
for _ in range(30):
42-
if not client.is_connected():
43-
break
44-
time.sleep(1)
40+
assert disconnected_event.wait(timeout=30), "Timed out waiting for disconnection"
4541
with pytest.raises(SendMessageError):
4642
client.send_to_group(group_name, name, "text")
4743
time.sleep(3) # wait to confirm message was NOT received
@@ -63,12 +59,10 @@ def test_disable_recovery_and_autoconnect_send_concurrently(
6359

6460
with client:
6561
group_name = "test_disable_recovery_and_autoconnect_send_concurrently"
62+
_, disconnected_event, _ = self.setup_events(client)
6663
client.join_group(group_name)
6764
client._ws.sock.close(1001) # close connection
68-
for _ in range(30):
69-
if not client.is_connected():
70-
break
71-
time.sleep(1)
65+
assert disconnected_event.wait(timeout=30), "Timed out waiting for disconnection"
7266
assert not client.is_connected()
7367

7468
def send(idx):

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_no_recovery_no_connect_async.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,10 @@ async def test_disable_recovery_and_autoconnect_async(self, webpubsubclient_endp
2929
name = "test_disable_recovery_and_autoconnect_async"
3030
async with client:
3131
group_name = name
32-
await client.subscribe("group-message", on_group_message)
32+
_, disconnected_event, _ = await self.setup_events(client)
3333
await client.join_group(group_name)
3434
await client._ws.session.close() # close connection
35-
# wait for client to detect disconnection so send raises SendMessageError
36-
for _ in range(30):
37-
if not client.is_connected():
38-
break
39-
await asyncio.sleep(1)
35+
await asyncio.wait_for(disconnected_event.wait(), timeout=30)
4036
with pytest.raises(SendMessageError):
4137
await client.send_to_group(group_name, name, "text")
4238
await asyncio.sleep(3) # wait to confirm message was NOT received
@@ -58,13 +54,10 @@ async def test_disable_recovery_and_autoconnect_send_concurrently_async(
5854

5955
async with client:
6056
group_name = "test_disable_recovery_and_autoconnect_send_concurrently_async"
57+
_, disconnected_event, _ = await self.setup_events(client)
6158
await client.join_group(group_name)
6259
await client._ws.session.close() # close connection
63-
# wait for client to detect disconnection
64-
for _ in range(30):
65-
if not client.is_connected():
66-
break
67-
await asyncio.sleep(1)
60+
await asyncio.wait_for(disconnected_event.wait(), timeout=30)
6861
assert not client.is_connected()
6962

7063
tasks = [client.send_to_group(group_name, "hello", "text") for _ in range(10)]

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_recovery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class TestWebpubsubClientRecovery(WebpubsubClientTest):
1717
def test_recovery(self, webpubsubclient_endpoint):
1818
client = self.create_client(endpoint=webpubsubclient_endpoint, message_retry_total=10)
1919
name = "test_recovery"
20-
connected_event, message_event = self.setup_events(client)
20+
connected_event, _, message_event = self.setup_events(client)
2121
with client:
2222
assert connected_event.wait(timeout=30), "Timed out waiting for connection"
2323
conn_id0 = client._connection_id

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_recovery_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class TestWebpubsubClientRecoveryAsync(WebpubsubClientTestAsync):
1919
async def test_recovery_async(self, webpubsubclient_endpoint):
2020
client = await self.create_client(endpoint=webpubsubclient_endpoint, message_retry_total=10)
2121
name = "test_recovery_async"
22-
connected_event, message_event = await self.setup_events(client)
22+
connected_event, _, message_event = await self.setup_events(client)
2323
async with client:
2424
await asyncio.wait_for(connected_event.wait(), timeout=30)
2525
conn_id0 = client._connection_id

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_send_concurrently.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
# Licensed under the MIT License. See License.txt in the project root for
55
# license information.
66
# -------------------------------------------------------------------------
7-
import time
87
import pytest
98
from devtools_testutils import recorded_by_proxy
109
from testcase import WebpubsubClientTest, WebpubsubClientPowerShellPreparer, SafeThread
@@ -16,14 +15,11 @@ class TestWebpubsubClientSendConcurrently(WebpubsubClientTest):
1615
@recorded_by_proxy
1716
def test_send_concurrently(self, webpubsubclient_endpoint):
1817
client = self.create_client(endpoint=webpubsubclient_endpoint)
18+
connected_event, _, _ = self.setup_events(client)
1919
with client:
20+
assert connected_event.wait(timeout=30), "Timed out waiting for connection"
2021
group_name = "test_send_concurrently"
2122
client.join_group(group_name)
22-
# wait for connection to stabilize before concurrent sends
23-
for _ in range(30):
24-
if client.is_connected():
25-
break
26-
time.sleep(1)
2723

2824
def send(idx):
2925
client.send_to_group(group_name, f"hello_{idx}", "text")

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_send_concurrently_async.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ class TestWebpubsubClientSendConcurrentlyAsync(WebpubsubClientTestAsync):
1717
@recorded_by_proxy_async
1818
async def test_send_concurrently_async(self, webpubsubclient_endpoint):
1919
client = await self.create_client(endpoint=webpubsubclient_endpoint)
20+
connected_event, _, _ = await self.setup_events(client)
2021
async with client:
22+
await asyncio.wait_for(connected_event.wait(), timeout=30)
2123
group_name = "test_send_concurrently_async"
2224
await client.join_group(group_name)
23-
for _ in range(30):
24-
if client.is_connected():
25-
break
26-
await asyncio.sleep(1)
2725
await asyncio.gather(*[client.send_to_group(group_name, f"hello_{idx}", "text") for idx in range(100)])

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_smoke.py

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,38 +29,44 @@ class TestWebpubsubClientSmoke(WebpubsubClientTest):
2929
def test_call_back_deadlock(self, webpubsubclient_endpoint):
3030
client = self.create_client(endpoint=webpubsubclient_endpoint)
3131
group_name = "test_call_back_deadlock"
32+
callback_completed = threading.Event()
33+
callback_count = 0
34+
callback_count_lock = threading.Lock()
3235

3336
def on_group_message(msg: OnGroupDataMessageArgs):
37+
nonlocal callback_count
3438
client.send_to_group(group_name, msg.data, "text", no_echo=True)
39+
with callback_count_lock:
40+
callback_count += 1
41+
if callback_count >= 3:
42+
callback_completed.set()
3543

3644
with client:
3745
client.join_group(group_name)
3846
client.subscribe("group-message", on_group_message)
3947
client.send_to_group(group_name, "hello test_call_back_deadlock1", "text")
4048
client.send_to_group(group_name, "hello test_call_back_deadlock2", "text")
4149
client.send_to_group(group_name, "hello test_call_back_deadlock3", "text")
42-
# sleep to make sure the callback has enough time to execute before close
43-
time.sleep(1)
50+
assert callback_completed.wait(timeout=30), "Timed out waiting for callbacks to finish"
4451

4552
@WebpubsubClientPowerShellPreparer()
4653
@recorded_by_proxy
4754
def test_context_manager(self, webpubsubclient_endpoint):
4855
client = self.create_client(endpoint=webpubsubclient_endpoint)
56+
_, _, message_event = self.setup_events(client)
4957
with client:
5058
group_name = "test_context_manager"
5159
client.join_group(group_name)
5260
client.send_to_group(group_name, "test_context_manager", "text")
53-
for _ in range(30):
54-
if client._sequence_id.sequence_id > 0:
55-
break
56-
time.sleep(1)
61+
assert message_event.wait(timeout=30), "Timed out waiting for context manager message"
5762
assert client._sequence_id.sequence_id > 0
5863

5964
# test on_stop
6065
@WebpubsubClientPowerShellPreparer()
6166
@recorded_by_proxy
6267
def test_on_stop(self, webpubsubclient_endpoint):
6368
client = self.create_client(endpoint=webpubsubclient_endpoint)
69+
connected_event, disconnected_event, _ = self.setup_events(client)
6470
reopen_error = None
6571
reopen_complete = threading.Event()
6672

@@ -77,10 +83,8 @@ def on_stop():
7783
else:
7884
raise RuntimeError("Failed to reopen client in stopped callback")
7985

80-
for _ in range(30):
81-
if client.is_connected():
82-
break
83-
time.sleep(1)
86+
if not connected_event.wait(timeout=30):
87+
raise RuntimeError("Timed out waiting for client to reconnect in stopped callback")
8488
assert client.is_connected()
8589
except Exception as e:
8690
reopen_error = e
@@ -90,11 +94,9 @@ def on_stop():
9094
with client:
9195
# open client again after close
9296
client.subscribe("stopped", on_stop)
93-
for _ in range(30):
94-
if client.is_connected():
95-
break
96-
time.sleep(1)
97+
assert connected_event.wait(timeout=30), "Timed out waiting for initial connection"
9798
assert client.is_connected()
99+
connected_event.clear()
98100
client.close()
99101
# wait for on_stop callback to finish reopening
100102
if not reopen_complete.wait(timeout=60):
@@ -103,12 +105,9 @@ def on_stop():
103105

104106
# remove stopped event and close again
105107
client.unsubscribe("stopped", on_stop)
108+
disconnected_event.clear()
106109
client.close()
107-
# wait for disconnect to finalize
108-
for _ in range(30):
109-
if not client.is_connected():
110-
break
111-
time.sleep(1)
110+
assert disconnected_event.wait(timeout=30), "Timed out waiting for client to disconnect"
112111
assert not client.is_connected()
113112

114113
@WebpubsubClientPowerShellPreparer()
@@ -148,7 +147,7 @@ def _test(enable_auto_rejoin, test_group_name, assert_func):
148147
auto_rejoin_groups=enable_auto_rejoin,
149148
message_retry_total=10,
150149
)
151-
connected_event, message_event = self.setup_events(client)
150+
connected_event, _, message_event = self.setup_events(client)
152151
with client:
153152
client.join_group(test_group_name)
154153

sdk/webpubsub/azure-messaging-webpubsubclient/tests/test_smoke_async.py

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,38 +28,42 @@ class TestWebpubsubClientSmokeAsync(WebpubsubClientTestAsync):
2828
async def test_call_back_deadlock_async(self, webpubsubclient_endpoint):
2929
client = await self.create_client(endpoint=webpubsubclient_endpoint)
3030
group_name = "test_call_back_deadlock_async"
31+
callback_completed = asyncio.Event()
32+
callback_count = 0
3133

3234
async def on_group_message(msg: OnGroupDataMessageArgs):
35+
nonlocal callback_count
3336
await client.send_to_group(group_name, msg.data, "text", no_echo=True)
37+
callback_count += 1
38+
if callback_count >= 3:
39+
callback_completed.set()
3440

3541
async with client:
3642
await client.join_group(group_name)
3743
await client.subscribe("group-message", on_group_message)
3844
await client.send_to_group(group_name, "hello test_call_back_deadlock1", "text")
3945
await client.send_to_group(group_name, "hello test_call_back_deadlock2", "text")
4046
await client.send_to_group(group_name, "hello test_call_back_deadlock3", "text")
41-
# sleep to make sure the callback has enough time to execute before close
42-
await asyncio.sleep(1)
47+
await asyncio.wait_for(callback_completed.wait(), timeout=30)
4348

4449
@WebpubsubClientPowerShellPreparer()
4550
@recorded_by_proxy_async
4651
async def test_context_manager_async(self, webpubsubclient_endpoint):
4752
client = await self.create_client(endpoint=webpubsubclient_endpoint)
53+
_, _, message_event = await self.setup_events(client)
4854
async with client:
4955
group_name = "test_context_manager_async"
5056
await client.join_group(group_name)
5157
await client.send_to_group(group_name, "test_context_manager", "text")
52-
for _ in range(30):
53-
if client._sequence_id.sequence_id > 0:
54-
break
55-
await asyncio.sleep(1)
58+
await asyncio.wait_for(message_event.wait(), timeout=30)
5659
assert client._sequence_id.sequence_id > 0
5760

5861
# test on_stop
5962
@WebpubsubClientPowerShellPreparer()
6063
@recorded_by_proxy_async
6164
async def test_on_stop_async(self, webpubsubclient_endpoint):
6265
client = await self.create_client(endpoint=webpubsubclient_endpoint)
66+
connected_event, disconnected_event, _ = await self.setup_events(client)
6367
reopen_error = None
6468
reopen_complete = asyncio.Event()
6569

@@ -76,10 +80,7 @@ async def on_stop():
7680
else:
7781
raise RuntimeError("Failed to reopen client in stopped callback")
7882

79-
for _ in range(30):
80-
if client.is_connected():
81-
break
82-
await asyncio.sleep(1)
83+
await asyncio.wait_for(connected_event.wait(), timeout=30)
8384
assert client.is_connected()
8485
except Exception as e:
8586
reopen_error = e
@@ -89,24 +90,19 @@ async def on_stop():
8990
async with client:
9091
# open client again after close
9192
await client.subscribe("stopped", on_stop)
92-
for _ in range(30):
93-
if client.is_connected():
94-
break
95-
await asyncio.sleep(1)
93+
await asyncio.wait_for(connected_event.wait(), timeout=30)
9694
assert client.is_connected()
95+
connected_event.clear()
9796
await client.close()
9897
# wait for on_stop callback to finish reopening
9998
await asyncio.wait_for(reopen_complete.wait(), timeout=60)
10099
assert reopen_error is None, f"on_stop callback failed: {reopen_error}"
101100

102101
# remove stopped event and close again
103102
await client.unsubscribe("stopped", on_stop)
103+
disconnected_event.clear()
104104
await client.close()
105-
# wait for disconnect to finalize
106-
for _ in range(30):
107-
if not client.is_connected():
108-
break
109-
await asyncio.sleep(1)
105+
await asyncio.wait_for(disconnected_event.wait(), timeout=30)
110106
assert not client.is_connected()
111107

112108
@WebpubsubClientPowerShellPreparer()
@@ -145,7 +141,7 @@ async def _test(enable_auto_rejoin, test_group_name, assert_func):
145141
endpoint=webpubsubclient_endpoint,
146142
auto_rejoin_groups=enable_auto_rejoin,
147143
)
148-
connected_event, message_event = await self.setup_events(client)
144+
connected_event, _, message_event = await self.setup_events(client)
149145
async with client:
150146
await client.join_group(test_group_name)
151147

0 commit comments

Comments
 (0)