Skip to content

Commit acfab40

Browse files
author
zhixiangli
committed
feat: add timeout to queue.put in _StreamMultiplexer to prevent hang
1 parent 774d691 commit acfab40

File tree

1 file changed

+30
-6
lines changed

1 file changed

+30
-6
lines changed

packages/google-cloud-storage/google/cloud/storage/asyncio/_stream_multiplexer.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
logger = logging.getLogger(__name__)
2727

2828
_DEFAULT_QUEUE_MAX_SIZE = 100
29+
_DEFAULT_PUT_TIMEOUT = 20.0
2930

3031

3132
class _StreamError:
@@ -84,6 +85,17 @@ def unregister(self, read_ids: Set[int]) -> None:
8485
def _get_unique_queues(self) -> Set[asyncio.Queue]:
8586
return set(self._queues.values())
8687

88+
async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None:
89+
try:
90+
await asyncio.wait_for(queue.put(item), timeout=_DEFAULT_PUT_TIMEOUT)
91+
except asyncio.TimeoutError:
92+
if queue not in self._get_unique_queues():
93+
logger.debug("Dropped item for unregistered queue.")
94+
else:
95+
logger.warning(
96+
"Queue full for too long. Dropping item to prevent multiplexer hang."
97+
)
98+
8799
def _ensure_recv_loop(self) -> None:
88100
if self._recv_task is None or self._recv_task.done():
89101
self._recv_task = asyncio.create_task(self._recv_loop())
@@ -109,8 +121,12 @@ async def _recv_loop(self) -> None:
109121
response = await self._stream.recv()
110122
if response is None:
111123
sentinel = _StreamEnd()
112-
for queue in self._get_unique_queues():
113-
await queue.put(sentinel)
124+
await asyncio.gather(
125+
*(
126+
self._put_with_timeout(queue, sentinel)
127+
for queue in self._get_unique_queues()
128+
)
129+
)
114130
return
115131

116132
if response.object_data_ranges:
@@ -120,11 +136,19 @@ async def _recv_loop(self) -> None:
120136
queue = self._queues.get(read_id)
121137
if queue:
122138
queues_to_notify.add(queue)
123-
for queue in queues_to_notify:
124-
await queue.put(response)
139+
await asyncio.gather(
140+
*(
141+
self._put_with_timeout(queue, response)
142+
for queue in queues_to_notify
143+
)
144+
)
125145
else:
126-
for queue in self._get_unique_queues():
127-
await queue.put(response)
146+
await asyncio.gather(
147+
*(
148+
self._put_with_timeout(queue, response)
149+
for queue in self._get_unique_queues()
150+
)
151+
)
128152
except asyncio.CancelledError:
129153
raise
130154
except Exception as e:

0 commit comments

Comments
 (0)