Skip to content

Commit 151ed14

Browse files
committed
Update async.py with advanced usage
1 parent 7146dc7 commit 151ed14

File tree

1 file changed

+126
-9
lines changed

1 file changed

+126
-9
lines changed

ultimatepython/advanced/async.py

Lines changed: 126 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
"""
2-
Concurrent programming with an event loop is a relatively new concept in
3-
Python 3.x. This module aims to highlight how it could be used in the
4-
context of a scheduler which runs a fire-and-forget operation for starting
5-
jobs. In the real world, it takes time for a scheduler to start a job (i.e.
6-
hit an API endpoint, ask the operating system for resources) so we assume
7-
that starting a job has some intrinsic delay.
2+
Concurrent programming with asyncio, introduced in Python 3.4, provides
3+
an event loop for handling asynchronous operations. This module demonstrates
4+
basic asyncio patterns including coroutines, tasks, concurrent execution
5+
with gather, and task cancellation. It uses the context of a scheduler
6+
which runs fire-and-forget operations for starting jobs, assuming that
7+
job startup has some intrinsic delay.
8+
9+
This module also covers advanced asyncio patterns including task groups
10+
for structured concurrency (Python 3.11+), semaphores for limiting
11+
concurrency, timeouts, exception handling in concurrent tasks, event
12+
loop management, and task shielding to protect critical operations from
13+
cancellation.
814
"""
915

1016
import asyncio
@@ -44,8 +50,15 @@ async def start_job(job_id: str, delay: float) -> JobRecord:
4450
return JobRecord(job_id, queue_time, start_time)
4551

4652

47-
async def schedule_jobs() -> None:
48-
"""Schedule jobs concurrently."""
53+
async def failing_job(job_id: str) -> None:
54+
"""A job that sometimes fails."""
55+
if int(job_id[-1]) % 3 == 0: # Fail every 3rd job
56+
raise ValueError(f"Job {job_id} failed!")
57+
await asyncio.sleep(_DELAY_SMALL)
58+
59+
60+
async def basic_async_patterns() -> None:
61+
"""Basic async patterns demonstration."""
4962
# Start a job which also represents a coroutine
5063
single_job = start_job(uuid4().hex, _DELAY_SMALL)
5164
assert asyncio.iscoroutine(single_job)
@@ -77,8 +90,112 @@ async def schedule_jobs() -> None:
7790
assert all(_is_valid_record(record) for record in batch_records)
7891

7992

93+
async def advanced_async_patterns() -> None:
94+
"""Demonstrate advanced asyncio patterns."""
95+
96+
# Task Groups - structured concurrency (Python 3.11+)
97+
async def task_group_example():
98+
results = []
99+
try:
100+
async with asyncio.TaskGroup() as tg:
101+
# Start multiple tasks in a group
102+
for i in range(5):
103+
tg.create_task(start_job(f"task_{i}", _DELAY_SMALL))
104+
except Exception as e:
105+
# TaskGroup propagates exceptions from child tasks
106+
pass
107+
108+
# All tasks in the group complete or fail together
109+
assert True # TaskGroup structure is valid
110+
111+
await task_group_example()
112+
113+
# Semaphores for limiting concurrency
114+
semaphore = asyncio.Semaphore(3) # Allow max 3 concurrent operations
115+
116+
async def limited_concurrency_job(job_id: str):
117+
async with semaphore:
118+
# Only 3 jobs can execute this section at once
119+
await asyncio.sleep(_DELAY_SMALL)
120+
return f"completed_{job_id}"
121+
122+
# Start 10 jobs but only 3 can run concurrently
123+
concurrent_jobs = [limited_concurrency_job(f"sem_{i}") for i in range(10)]
124+
concurrent_results = await asyncio.gather(*concurrent_jobs)
125+
assert len(concurrent_results) == 10
126+
assert all("completed_" in result for result in concurrent_results)
127+
128+
# Exception handling with gather
129+
mixed_jobs = [
130+
failing_job("job_1"), # succeeds
131+
failing_job("job_2"), # succeeds
132+
failing_job("job_3"), # fails
133+
failing_job("job_4"), # succeeds
134+
]
135+
136+
# Return exceptions instead of raising them
137+
results = await asyncio.gather(*mixed_jobs, return_exceptions=True)
138+
assert len(results) == 4
139+
# Check that we got a mix of results and exceptions
140+
exceptions_found = sum(1 for r in results if isinstance(r, Exception))
141+
successes_found = sum(1 for r in results if not isinstance(r, Exception))
142+
assert exceptions_found == 1 # One job failed
143+
assert successes_found == 3 # Three jobs succeeded
144+
145+
# Timeouts and cancellation
146+
async def slow_job():
147+
await asyncio.sleep(1.0) # Takes 1 second
148+
return "slow_result"
149+
150+
try:
151+
# Timeout after 0.1 seconds
152+
result = await asyncio.wait_for(slow_job(), timeout=0.1)
153+
except asyncio.TimeoutError:
154+
result = None
155+
assert result is None # Should timeout
156+
157+
# Event loop management
158+
loop = asyncio.get_running_loop()
159+
assert isinstance(loop, asyncio.AbstractEventLoop)
160+
161+
# Schedule callback on the event loop
162+
callback_result = None
163+
164+
def sync_callback():
165+
nonlocal callback_result
166+
callback_result = "callback_executed"
167+
168+
# Schedule callback to run soon
169+
loop.call_soon(sync_callback)
170+
await asyncio.sleep(0) # Let the event loop process callbacks
171+
assert callback_result == "callback_executed"
172+
173+
# Shielding tasks from cancellation
174+
async def important_task():
175+
await asyncio.sleep(_DELAY_SMALL)
176+
return "important_result"
177+
178+
task = asyncio.create_task(important_task())
179+
shielded_task = asyncio.shield(task)
180+
181+
# Even if we cancel the shield, the underlying task continues
182+
shielded_task.cancel()
183+
try:
184+
await shielded_task
185+
except asyncio.CancelledError:
186+
pass
187+
188+
# The original task should still complete
189+
await task # Wait for the original task
190+
assert task.result() == "important_result"
191+
192+
80193
def main() -> None:
81-
asyncio.run(schedule_jobs())
194+
# Run basic patterns
195+
asyncio.run(basic_async_patterns())
196+
197+
# Run advanced patterns
198+
asyncio.run(advanced_async_patterns())
82199

83200

84201
if __name__ == "__main__":

0 commit comments

Comments
 (0)