My simple implementation of asynchronous in Python
- Create venv
python -m venv .venv - Activate venv
. .venv/bin/activate - Install
myasync
pip install myasync@git+https://github.com/KrySeyt/myasync.git- Install
myasyncwith example
pip install myasync@git+https://github.com/KrySeyt/myasync.git- Run example server
python -m example_server &- Run example client
python -m example_client Await- socket with expected I/O type. Yield it fromCoroutine- Await
CoroutineandTaskwithyield from - If
Coroutineare not ready and it should be called later justyield Nonefrom coro.sleepandLockworks this way - If you wanna have async interface, but in current implementation nothing to
yieldoryield from
(no sockets, no Coroutines and no Tasks), justyield Noneoryield from myasync.sleep(0). Example:
import myasync
def io_operation(data: str) -> myasync.Coroutine[None]:
# Mock implementation
yield None
print(f"Data: {data}")import time
import socket
from myasync import Coroutine, Await, IOType, gather, run
def send_request() -> Coroutine[str]:
"""
Example function that requires await socket
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 8000))
sock.send(b"GET / HTTP/1.1\r\nHost:localhost\r\n\r\n")
yield Await(sock, IOType.INPUT)
sock.recv(100)
sock.close()
print("Task done")
return "Done"
def main() -> Coroutine[None]:
start = time.time()
yield from gather(
send_request(),
send_request(),
send_request(),
send_request(),
)
end = time.time()
print(end - start)
if __name__ == "__main__":
run(main())