Skip to content

Commit 998459f

Browse files
committed
added streaming file example from discord
1 parent 5933a8c commit 998459f

File tree

5 files changed

+136
-0
lines changed

5 files changed

+136
-0
lines changed

examples/bugs/__init__.py

Whitespace-only changes.

examples/bugs/streaming_file/__init__.py

Whitespace-only changes.
+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
import logging
3+
from _codecs import utf_8_decode
4+
from typing import List
5+
6+
from reactivex import operators
7+
8+
from rsocket.extensions.helpers import composite, route
9+
from rsocket.extensions.mimetypes import WellKnownMimeTypes
10+
from rsocket.helpers import single_transport_provider
11+
from rsocket.payload import Payload
12+
from rsocket.reactivex.reactivex_client import ReactiveXClient
13+
from rsocket.rsocket_client import RSocketClient
14+
from rsocket.transports.tcp import TransportTCP
15+
16+
17+
def got_data(payload):
18+
logging.info('From server: ' + payload.data.decode('utf-8'))
19+
return utf_8_decode(payload.data)
20+
21+
22+
class Client:
23+
24+
def __init__(self, rs: RSocketClient):
25+
self._rs = rs
26+
27+
async def stream_file(self) -> List[str]:
28+
request = Payload(metadata=composite(route('audio')))
29+
return await ReactiveXClient(self._rs).request_stream(request).pipe(
30+
operators.map(got_data),
31+
operators.to_list()
32+
)
33+
34+
35+
async def main():
36+
connection = await asyncio.open_connection('localhost', 8000)
37+
async with RSocketClient(single_transport_provider(TransportTCP(*connection)),
38+
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,
39+
fragment_size_bytes=1_000_000) as client:
40+
c = Client(client)
41+
r = await c.stream_file()
42+
print(r)
43+
44+
if __name__ == '__main__':
45+
logging.basicConfig(level=logging.DEBUG)
46+
asyncio.run(main())

examples/bugs/streaming_file/my_file

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
gsdfgsdfg
2+
sdfgsdfgsdfggsdfgsdfg
3+
sdfgsdfgsdfggsdfgsdfg
4+
sdfgsdfgsdfggsdfgsdfg
5+
sdfgsdfgsdfggsdfgsdfg
6+
sdfgsdfgsdfggsdfgsdfg
7+
sdfgsdfgsdfggsdfgsdfg
8+
sdfgsdfgsdfggsdfgsdfg
9+
sdfgsdfgsdfggsdfgsdfg
10+
sdfgsdfgsdfggsdfgsdfg
11+
sdfgsdfgsdfggsdfgsdfg
12+
sdfgsdfgsdfggsdfgsdfg
13+
sdfgsdfgsdfggsdfgsdfg
14+
sdfgsdfgsdfggsdfgsdfg
15+
sdfgsdfgsdfggsdfgsdfg
16+
sdfgsdfgsdfggsdfgsdfg
17+
sdfgsdfgsdfggsdfgsdfg
18+
sdfgsdfgsdfggsdfgsdfg
19+
sdfgsdfgsdfggsdfgsdfg
20+
sdfgsdfgsdfggsdfgsdfg
21+
sdfgsdfgsdfggsdfgsdfg
22+
sdfgsdfgsdfggsdfgsdfg
23+
sdfgsdfgsdfggsdfgsdfg
24+
sdfgsdfgsdfggsdfgsdfg
25+
sdfgsdfgsdfggsdfgsdfg
26+
sdfgsdfgsdfgvvv
+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import asyncio
2+
import base64
3+
import logging
4+
from typing import Callable
5+
6+
import aiofiles
7+
from reactivex import Subject, Observable, operators
8+
9+
from rsocket.payload import Payload
10+
from rsocket.reactivex.back_pressure_publisher import from_observable_with_backpressure, observable_from_queue
11+
from rsocket.reactivex.reactivex_handler_adapter import reactivex_handler_factory
12+
from rsocket.routing.request_router import RequestRouter
13+
from rsocket.routing.routing_request_handler import RoutingRequestHandler
14+
from rsocket.rsocket_server import RSocketServer
15+
from rsocket.transports.tcp import TransportTCP
16+
17+
18+
async def read_file(o: asyncio.Queue):
19+
async with aiofiles.open("my_file", 'rb') as f:
20+
while True:
21+
buffer = await f.read(1024)
22+
logging.info("Reading buffer")
23+
if buffer == b'':
24+
o.put_nowait(None)
25+
break
26+
await o.put(base64.b64encode(buffer))
27+
28+
29+
def handler_factory() -> RoutingRequestHandler:
30+
router = RequestRouter()
31+
32+
@router.stream('audio')
33+
async def audio() -> Callable[[Subject], Observable]:
34+
q = asyncio.Queue()
35+
# await q.put(b'sf') # testing whether a simpler queuing scheme fixes the issue
36+
asyncio.create_task(read_file(q))
37+
return from_observable_with_backpressure(
38+
lambda backpressure: observable_from_queue(
39+
q, backpressure=backpressure).pipe(
40+
operators.map(lambda buffer: Payload(buffer))
41+
)
42+
)
43+
44+
return RoutingRequestHandler(router)
45+
46+
47+
async def run_server():
48+
def session(*connection):
49+
RSocketServer(TransportTCP(*connection),
50+
handler_factory=reactivex_handler_factory(handler_factory),
51+
fragment_size_bytes=1_000_000)
52+
53+
async with await asyncio.start_server(session, 'localhost', 8000) as server:
54+
await server.serve_forever()
55+
56+
57+
def main():
58+
logging.basicConfig(level=logging.DEBUG)
59+
logging.info("Starting server...")
60+
asyncio.run(run_server())
61+
62+
63+
if __name__ == '__main__':
64+
main()

0 commit comments

Comments
 (0)