Skip to content

Commit c76c8c9

Browse files
committed
fixes #2
1 parent 17683ef commit c76c8c9

6 files changed

Lines changed: 306 additions & 173 deletions

File tree

README.md

Lines changed: 123 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,223 +1,181 @@
11
# microio
22

3-
`microio` is a tiny asyncio-first runtime helper library for services that own
4-
event loops, sockets, background threads, and request/reply waiters.
5-
6-
It is inspired by AnyIO's practical concurrency ideas, especially the problems
7-
called out in [Why you should be using AnyIO APIs instead of asyncio APIs][anyio-why]:
8-
9-
- **task readiness**: a child service should be able to report "ready" or "failed"
10-
before its parent continues;
11-
- **cancel scopes**: stopping is a durable state with a reason, not a one-shot flag
12-
that individual operations may miss;
13-
- **memory object streams**: producers and consumers should be split into explicit
14-
sender/receiver endpoints with clear close semantics;
15-
- **thread bridges**: code outside an event-loop thread needs a safe way to submit
16-
work into that loop and observe failures.
17-
18-
`microio` is not a compatibility layer over asyncio, Trio, and Curio. It is also
19-
not a reimplementation of AnyIO. It intentionally stays smaller:
20-
21-
- asyncio only;
22-
- stdlib only;
23-
- no generic networking/file APIs;
24-
- cooperative level cancellation only where code uses `microio` scopes and checkpoints;
25-
- no pytest plugin or framework-level dependency injection.
26-
27-
The goal is to make the common "small service runtime" patterns reliable and
28-
testable without pulling a full concurrency abstraction into projects that already
29-
use asyncio directly.
30-
31-
## What It Provides
32-
33-
### `TaskGroup` / `CancelScope`
34-
35-
`create_task_group()` wraps `asyncio.TaskGroup`. It keeps the stdlib failure
36-
rules, and adds the missing cancellation/readiness pieces:
37-
38-
- `tg.start_soon(fn, *args)` starts a child task;
39-
- `await tg.start(fn, *args)` starts a child and waits until it calls
40-
`task_status.started(value)`;
41-
- `tg.cancel_scope.cancel()` or `tg.cancel()` cancels owned tasks and treats
42-
that as normal shutdown;
43-
- `checkpoint()`, `checkpoint_if_cancelled()`, and `sleep()` provide cooperative
44-
level cancellation for code that uses `microio` primitives;
45-
- `move_on_after(seconds)` suppresses deadline cancellation;
46-
- `fail_after(seconds)` turns deadline cancellation into `TimeoutError`.
47-
48-
The group-cancel path borrows the small `asyncio_cancel_scope` trick: when a
49-
child task or another thread asks a group to stop, `microio` injects a private
50-
task exception into the underlying `asyncio.TaskGroup` and suppresses just that
51-
private exception on exit.
52-
53-
This is still asyncio cancellation. Raw `await something()` follows asyncio's
54-
edge-cancellation rules. Once code returns to a `microio` checkpoint, cancelled
55-
scopes keep raising `CancelledError`, even if earlier cancellation was caught.
56-
57-
Shielding is not exposed. A partial shield around raw `Task.cancel()` would look
58-
stronger than it is.
3+
Small, dependency-free tools for programs that mix **threads and asyncio** — where data, cancellation, and failure have to cross the thread/event-loop boundary without races, hangs, or silent loss.
594

60-
```python
61-
from microio import create_task_group, sleep
5+
## The problem
626

7+
Real programs are rarely purely async. A typical shape: one thread owns an event loop doing the interesting work, while other threads — a socket reader, a control channel, the main thread, sometimes a *signal handler* — need to:
638

64-
async def worker():
65-
while True: await sleep(1)
9+
- **feed it work** (without touching the loop from the wrong thread),
10+
- **cancel work it's doing** (without killing the loop, and without the cancellation leaking somewhere unrelated),
11+
- **block waiting for an answer** from it (and get woken with an error, not hang forever, if it dies),
12+
- **know it started up properly**, and **know it actually shut down**.
6613

14+
The stdlib gives you the raw ingredients — `call_soon_threadsafe`, `run_coroutine_threadsafe`, `Thread`, `Queue` — and leaves all of the above as an exercise. That exercise is where deadlocks, dropped messages, zombie threads, and "it stopped responding but the process looks fine" bugs live.
6715

68-
async with create_task_group() as tg:
69-
tg.start_soon(worker)
70-
await sleep(0.1)
71-
tg.cancel()
72-
```
16+
[Trio](https://trio.readthedocs.io/) and [AnyIO](https://anyio.readthedocs.io/) solve concurrency beautifully, but *inside* one async world: they assume the code in control is itself async. When the thing doing the cancelling is another thread — or a SIGINT handler that must not take any locks — you're back on your own.
7317

74-
### `CloseScope`
18+
microio is that missing layer: ~800 lines, stdlib only, asyncio only, Python 3.11+.
7519

76-
`CloseScope` is a small, thread-safe stop/failure state object. It records whether
77-
a service is closing, why it is closing, and whether there is an exception that
78-
should be propagated to waiters.
20+
## What's in the box
7921

80-
This is separate from `CancelScope`. `CloseScope` is for thread-safe service
81-
lifecycle state. It does not cancel asyncio tasks for you.
22+
**Move data across the boundary**
8223

83-
### `ServiceThread` / `ServiceGroup`
24+
- `create_channel()` — a sender usable from any thread (even before the loop exists), an async receiver with `async for`, and explicit close/fail semantics that *wake* the receiver rather than strand it.
25+
- `Mailbox` / `ActorCore` — the channel wrapped into the common actor shape: thread-safe `submit()`, one-at-a-time async handling.
8426

85-
`ServiceThread` is a supervised `threading.Thread`:
27+
**Move control across the boundary**
8628

87-
- child code calls `started()` after resources are ready;
88-
- parents call `wait_started()` and get either readiness or the startup exception;
89-
- `stop()` marks the thread's `CloseScope`;
90-
- `join_or_log()` checks timeout results instead of ignoring them.
29+
- `CancelScope` — trio-style cancellation scopes for asyncio, cancellable **from any thread**. A scope that cancels its own region cleanly catches the cancellation at its exit; an issued-but-undelivered cancellation is retracted, never leaked into unrelated code.
30+
- `ScopeGroup` — a live registry of cancellable regions: enter with `scope()`, cancel them *all* from anywhere with `cancel()`. The `latch` option also cancels regions entered just after the cancel — closing the classic check-then-act race. Lock-free reads, so it's safe to call from a signal handler.
31+
- `CloseScope`thread-safe "we are stopping, here's why" state, closable exactly once.
32+
- `WorkTracker` — a WaitGroup: in-flight work counter with a `busy` Event any thread can check or wait on.
9133

92-
Use it for socket threads, protocol readers, and other owned background services.
34+
**Wait across the boundary**
9335

94-
`ServiceGroup` owns the repeated lifecycle boilerplate for a small set of service
95-
threads:
36+
- `RequestRegistry` — request/reply bookkeeping between threads: register, block with timeout, resolve from the reader thread, and — the part hand-rolled versions always miss — `fail_all()` so that when the connection dies, every blocked waiter gets the exception instead of hanging forever.
9637

97-
```python
98-
services = ServiceGroup(iopub, stdin, heartbeat).start().wait_started()
99-
...
100-
services.stop_join(timeout=1)
101-
```
38+
**Own your threads properly**
10239

103-
### `LoopServiceThread`
40+
- `ServiceThread` — a supervised thread: it reports `started()` or its parent's `wait_started()` raises the real startup exception; `stop()` is durable state, not a flag a loop might miss; `join_or_log()` never silently ignores a join timeout.
41+
- `LoopServiceThread` — a `ServiceThread` that owns an `asyncio.Runner`: `submit(coro)` and `call_sync(fn)` from any thread, structured shutdown of its child tasks.
42+
- `ServiceGroup` — start/wait/stop/join a set of services without boilerplate.
10443

105-
`LoopServiceThread` owns an `asyncio.Runner` inside a thread and exposes:
44+
**Structured async (the in-loop part)**
10645

107-
- `call_soon()` for thread-safe callbacks;
108-
- `call_sync()` for thread-safe callbacks with a return value;
109-
- `submit()` for coroutine submission from other threads;
110-
- `task_group` for async work owned by the service;
111-
- the same ready/failed/stop/join behavior as `ServiceThread`.
46+
- `TaskGroup` (wrapping `asyncio.TaskGroup`) with `start_soon`, `await tg.start(...)`/`task_status.started()` readiness, and group cancellation that works from other threads; `move_on_after`, `fail_after`, `checkpoint`, `sleep`.
11247

113-
This is the small subset of AnyIO's thread-bridge idea that asyncio services often
114-
need: create one loop in one thread, keep ownership clear, submit coroutine work
115-
safely, and synchronously run small functions on the loop thread when needed.
116-
`stop()` cancels the service task group, so owned child tasks shut down with the
117-
service.
48+
## Examples
11849

119-
### `ObjectChannel`
50+
### A thread feeding an event loop
12051

121-
`create_channel()` returns `(send, receive)` endpoints. A sender can be used from
122-
other threads before or after the receiver has bound to an event loop. The receiver
123-
is async and supports `async for`.
52+
```python
53+
import asyncio, threading
54+
from microio import create_channel
12455

125-
This is inspired by AnyIO memory object streams, but adjusted for service threads:
56+
send, recv = create_channel()
12657

127-
- the default buffer is unbounded because cross-thread producers often cannot
128-
await backpressure;
129-
- close is explicit and wakes async receivers;
130-
- receivers raise `EndOfStream` on direct receive after close;
131-
- `fail(exc)` is explicit and wakes async receivers with the exception;
132-
- late sends raise `ClosedResourceError` unless `late_send="drop"` is selected;
133-
- the implementation is intentionally single-receiver and simple.
58+
def producer(): # any thread, no loop required
59+
for i in range(5): send.send_nowait(i)
60+
send.close() # wakes the receiver; the async-for ends
13461

135-
### `Mailbox` / `ActorCore`
62+
async def main():
63+
threading.Thread(target=producer).start()
64+
async for item in recv: print(item)
13665

137-
`Mailbox` wraps an `ObjectChannel` for the common actor shape: thread-safe
138-
`submit()`, async receive, `close()`, `fail()`, and `drain_nowait()`.
66+
asyncio.run(main())
67+
```
13968

140-
`ActorCore` is the tiny serialized consumer loop:
69+
### A background thread that owns a loop — with checked startup and shutdown
14170

14271
```python
143-
actor = ActorCore(handle)
144-
actor.submit(item)
145-
await actor.run()
72+
from microio import LoopServiceThread, sleep
73+
74+
class Service(LoopServiceThread):
75+
async def run_async(self):
76+
self.db = await connect() # resources live on the loop thread
77+
self.started() # parent's wait_started() returns now
78+
while not self.scope.closed: await sleep(0.1)
79+
80+
svc = Service(name="db-service")
81+
svc.start()
82+
svc.wait_started(timeout=5) # raises the real traceback if connect() failed
83+
fut = svc.submit(svc.db.query("...")) # run a coroutine on the service loop, from any thread
84+
rows = fut.result(timeout=5)
85+
svc.stop()
86+
svc.join_or_log(timeout=2) # a join timeout is logged, never swallowed
14687
```
14788

148-
It is deliberately not tied to a thread. A service thread, a main-thread runner,
149-
or a test can all run the same actor core.
89+
Half of debugging multithreaded programs is finding the thread that died quietly at startup, or never exited at shutdown. `ServiceThread` makes both loud.
15090

151-
### `RequestRegistry`
91+
### Cancelling async work from another thread (or a signal handler)
15292

153-
`RequestRegistry` tracks request IDs and waiters:
93+
```python
94+
from microio import ScopeGroup, sleep
95+
96+
scopes = ScopeGroup()
15497

155-
- register a request;
156-
- resolve it from another thread through a `ReplyHandle`;
157-
- wait with timeout;
158-
- wrap the common register-send-wait pattern with `request(key, send)`;
159-
- fail one or all pending requests on service crash/close.
98+
async def job():
99+
with scopes.scope() as scope: # registers a cancellable region
100+
await do_work()
101+
if scope.cancelled_caught: print("interrupted; cleaning up")
160102

161-
This is useful for debug adapters, stdin routers, RPC clients, and any protocol
162-
where a reader thread must wake request waiters reliably.
103+
# meanwhile, from ANY other thread — or a SIGINT handler (no locks taken):
104+
scopes.cancel("user interrupt", latch=True) # latch also catches a job that is *just* starting
105+
```
163106

164-
## Example
107+
The cancellation lands inside the `with` block and is caught at its exit — the task survives, follow-up code (sending an error reply, releasing resources) still runs, and nothing leaks to other tasks.
108+
109+
### Serialized message handling, with an escape hatch
165110

166111
```python
167-
import asyncio
168-
from microio import LoopServiceThread, create_channel
112+
from microio import ActorCore
113+
114+
async def handle(msg): await process(msg) # one at a time, in arrival order
169115

116+
actor = ActorCore(handle)
117+
actor.submit(msg) # thread-safe, from anywhere
118+
await actor.run() # in the loop that owns the actor
119+
```
170120

171-
class Worker(LoopServiceThread):
172-
def __init__(self):
173-
super().__init__(name="worker")
174-
self.send, self.receive = create_channel()
121+
When a handler needs to let the queue keep moving while it waits on something slow, `concurrent=True` hands each handler a release baton:
175122

176-
async def run_async(self):
177-
self.receive.bind(asyncio.get_running_loop())
178-
self.started()
179-
async for item in self.receive:
180-
if item == "stop":
181-
self.stop()
182-
break
183-
print(item)
184-
185-
186-
worker = Worker()
187-
worker.start()
188-
worker.wait_started()
189-
worker.send.send_nowait("hello")
190-
worker.send.send_nowait("stop")
191-
worker.join_or_log(timeout=1)
123+
```python
124+
async def handle(msg, release):
125+
prepare(msg) # this part stays strictly ordered
126+
release() # from here on, the next message may start...
127+
await slow_io(msg) # ...it actually runs whenever this one suspends
128+
129+
actor = ActorCore(handle, concurrent=True)
192130
```
193131

194-
## Design Rules
132+
Handlers that never call `release()` behave exactly like the serialized actor — ordering is opt-out per message, not a global mode.
195133

196-
- Prefer explicit state over hidden magic.
197-
- Make startup failure visible to the parent.
198-
- Never ignore a join timeout.
199-
- Waking pending waiters on close/crash is part of the service contract.
200-
- Keep asyncio ownership clear: a socket or loop belongs to one service thread.
134+
### Request/reply that can't hang
201135

202-
## Development
136+
```python
137+
from microio import RequestRegistry
203138

204-
`microio` requires Python 3.11+.
139+
reg = RequestRegistry()
205140

206-
```bash
207-
pip install -e .[dev]
208-
pytest -q
141+
# requesting thread: register, send, block for the answer
142+
reply = reg.request(msg_id, send=lambda h: sock.send(payload), timeout=10)
143+
144+
# reader thread, when the response arrives:
145+
reg.resolve(msg_id, response)
146+
147+
# reader thread, when the connection dies:
148+
reg.fail_all(ConnectionError("reader died")) # every blocked requester raises instead of hanging
209149
```
210150

211-
## Examples
151+
### Everything together
212152

213-
Run the counter service example:
153+
[`examples/counter_server.py`](examples/counter_server.py) is a complete ~90-line in-process server combining `LoopServiceThread`, channels, `RequestRegistry`, and `CloseScope`:
214154

215155
```bash
216156
python examples/counter_server.py
217157
```
218158

219-
It shows `LoopServiceThread`, `ObjectChannel`, `RequestRegistry`, and
220-
`CloseScope` working together in one small service.
159+
## Design rules
160+
161+
- **Failures are loud.** Startup errors reach the parent with their traceback; join timeouts are logged; dead readers wake their waiters with the real exception.
162+
- **Closing is a durable state with a reason**, not a one-shot flag an operation might miss. Everything closes exactly once.
163+
- **Control may come from anywhere.** Cancellation, close, and stop are safe from other threads, and the read paths are lock-free so they're safe from signal handlers.
164+
- **Ownership is explicit.** A loop, socket, or receiver belongs to one thread; everyone else talks to it through these primitives.
165+
166+
## What microio is not
167+
168+
- Not an AnyIO replacement: asyncio only, no networking or file APIs, no shielding, single-receiver channels. If your whole program is async, use AnyIO — it's excellent, and microio's scope/readiness design borrows directly from [its ideas][anyio-why].
169+
- Cancellation is still asyncio cancellation: raw `await`s follow asyncio's edge-triggered rules; microio's `checkpoint()`/`sleep()` add level-triggered behavior where you opt in.
170+
171+
microio was extracted from a Jupyter kernel, where all of these problems show up at once: a protocol thread feeding an execution loop, Ctrl-C arriving as a signal that must cancel a coroutine on another thread, and clients that disconnect while something is blocked waiting on them. The primitives are general; that's just the crucible they were forged in.
172+
173+
## Development
174+
175+
```bash
176+
pip install -e .[dev]
177+
pytest -q
178+
```
221179

222180
Version lives in `microio/__init__.py` as `__version__`.
223181

microio/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from ._actor import ActorCore, Mailbox
55
from ._channel import ChannelStats, ObjectReceiveChannel, ObjectSendChannel, create_channel
66
from ._registry import ReplyHandle, RequestRegistry
7-
from ._scope import BrokenResourceError, ClosedResourceError, CloseScope, EndOfStream
8-
from ._task import (CancelScope, TaskGroup, TaskStatus, checkpoint, checkpoint_if_cancelled, create_task_group, current_cancel_scope, fail_after, move_on_after,
9-
sleep)
7+
from ._scope import BrokenResourceError, ClosedResourceError, CloseScope, EndOfStream, WorkTracker
8+
from ._task import (CancelScope, ScopeGroup, TaskGroup, TaskStatus, checkpoint, checkpoint_if_cancelled, create_task_group, current_cancel_scope, fail_after,
9+
move_on_after, sleep)
1010
from ._thread import LoopServiceThread, ServiceGroup, ServiceThread
1111

0 commit comments

Comments
 (0)