Skip to content

Commit a450820

Browse files
authored
Merge pull request #2943 from finos/fix-uvicorn-bug
Fix segfault in Python webservers
2 parents d062958 + 83cec30 commit a450820

File tree

5 files changed

+209
-25
lines changed

5 files changed

+209
-25
lines changed

rust/perspective-python/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ pyo3 = { version = "0.23.4", features = [
7171
"experimental-async",
7272
"extension-module",
7373
"serde",
74-
"py-clone",
7574
] }
7675
pythonize = "0.23.0"
7776
tracing = { version = ">=0.1.36" }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2+
# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3+
# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4+
# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5+
# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6+
# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7+
# ┃ Copyright (c) 2017, the Perspective Authors. ┃
8+
# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9+
# ┃ This file is part of the Perspective library, distributed under the terms ┃
10+
# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11+
# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12+
13+
import asyncio
14+
import threading
15+
import websocket
16+
import os
17+
import os.path
18+
19+
import tornado.websocket
20+
import tornado.web
21+
import tornado.ioloop
22+
23+
import perspective
24+
import perspective.handlers.tornado
25+
26+
PORT = 8082
27+
28+
29+
def test_big_multi_thing():
30+
here = os.path.abspath(os.path.dirname(__file__))
31+
file_path = os.path.join(
32+
here,
33+
"..",
34+
"..",
35+
"..",
36+
"..",
37+
"..",
38+
"node_modules",
39+
"superstore-arrow",
40+
"superstore.lz4.arrow",
41+
)
42+
43+
async def init_table(client):
44+
global SERVER_DATA
45+
global SERVER_TABLE
46+
with open(file_path, mode="rb") as file:
47+
SERVER_DATA = file.read()
48+
SERVER_TABLE = client.table(SERVER_DATA, name="superstore")
49+
50+
global ws
51+
ws = websocket.WebSocketApp(
52+
"ws://localhost:{}/websocket".format(PORT),
53+
on_open=on_open,
54+
on_message=on_message,
55+
# on_error=on_error,
56+
# on_close=on_close,
57+
)
58+
59+
global ws_thread
60+
ws_thread = threading.Thread(target=ws.run_forever)
61+
ws_thread.start()
62+
63+
def server_thread():
64+
def make_app(perspective_server):
65+
return tornado.web.Application(
66+
[
67+
(
68+
r"/websocket",
69+
perspective.handlers.tornado.PerspectiveTornadoHandler,
70+
{"perspective_server": perspective_server},
71+
),
72+
]
73+
)
74+
75+
perspective_server = perspective.Server()
76+
app = make_app(perspective_server)
77+
global server
78+
server = app.listen(PORT)
79+
80+
global server_loop
81+
server_loop = tornado.ioloop.IOLoop.current()
82+
client = perspective_server.new_local_client(
83+
loop_callback=server_loop.add_callback
84+
)
85+
server_loop.call_later(0, init_table, client)
86+
server_loop.start()
87+
88+
server_thread = threading.Thread(target=server_thread)
89+
server_thread.start()
90+
91+
client_loop = asyncio.new_event_loop()
92+
client_loop.set_debug(True)
93+
client_thread = threading.Thread(target=client_loop.run_forever)
94+
client_thread.start()
95+
96+
async def send_request(msg):
97+
global ws
98+
ws.send(msg, websocket.ABNF.OPCODE_BINARY)
99+
100+
def on_message(ws, message):
101+
async def poke_client():
102+
await client.handle_response(message)
103+
104+
asyncio.run_coroutine_threadsafe(poke_client(), client_loop)
105+
106+
# def on_error(ws, error):
107+
# print(f"Error!: {error}")
108+
109+
# def on_close(ws, close_status_code, close_msg):
110+
# print("Connection closed")
111+
112+
def on_open(ws):
113+
global client
114+
client = perspective.AsyncClient(send_request)
115+
asyncio.run_coroutine_threadsafe(test(client), client_loop)
116+
117+
global count
118+
count = 0
119+
120+
def update(x):
121+
global count
122+
count += 1
123+
124+
async def test(client):
125+
table = await client.open_table("superstore")
126+
view = await table.view()
127+
await view.on_update(update)
128+
SERVER_TABLE.update(SERVER_DATA)
129+
assert await table.size() == 19988
130+
assert count == 1
131+
await server.close_all_connections()
132+
client_loop.stop()
133+
134+
client_thread.join()
135+
client_loop.close()
136+
ws.close()
137+
ws_thread.join()
138+
server_loop.add_callback(server_loop.stop)
139+
server_thread.join()
140+
server_loop.close()

rust/perspective-python/perspective/tests/multi_threaded/test_multi_threaded.py

+53-8
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,69 @@
1818

1919

2020
class TestServer(object):
21-
def test_concurrent_updates(self):
21+
def test_sync_updates_with_loop_callback_are_sync(self):
2222
def feed(table):
2323
y = 1000
2424
while y > 0:
2525
y -= 1
26-
table.update([{"a": random.randint(0, 10), "index": 1}])
26+
table.update([{"a": random.randint(0, 10), "index": y}])
2727

2828
perspective_server = Server()
29-
loop = asyncio.get_event_loop()
30-
client = perspective_server.new_local_client(
31-
loop_callback=loop.call_soon_threadsafe
32-
)
29+
loop = asyncio.new_event_loop()
30+
thread = threading.Thread(target=loop.run_forever)
31+
thread.start()
3332

33+
client = perspective_server.new_local_client()
3434
table = client.table(
3535
{"a": "integer", "index": "integer"}, index="index", name="test"
3636
)
3737

38-
thread = threading.Thread(target=feed, args=(table,), daemon=True)
39-
thread.start()
38+
view = table.view()
39+
global count
40+
count = 0
41+
42+
def update(x):
43+
global count
44+
count += 1
45+
46+
view.on_update(update)
4047
feed(table)
48+
assert table.size() == 1000
49+
assert count == 1000
50+
loop.call_soon_threadsafe(loop.stop)
51+
thread.join()
52+
loop.close()
53+
54+
def test_concurrent_updates(self):
55+
async def feed(table, loop):
56+
y = 1000
57+
while y > 0:
58+
y -= 1
59+
table.update([{"a": random.randint(0, 10), "index": y}])
60+
await asyncio.sleep(0.001)
61+
62+
perspective_server = Server()
63+
loop = asyncio.new_event_loop()
64+
thread = threading.Thread(target=loop.run_forever)
65+
thread.start()
66+
67+
client = perspective_server.new_local_client()
68+
table = client.table(
69+
{"a": "integer", "index": "integer"}, index="index", name="test"
70+
)
71+
72+
view = table.view()
73+
global count
74+
count = 0
75+
76+
def update(x):
77+
global count
78+
count += 1
79+
80+
view.on_update(update)
81+
asyncio.run_coroutine_threadsafe(feed(table, loop), loop).result()
82+
assert table.size() == 1000
83+
assert count == 1000
84+
loop.call_soon_threadsafe(loop.stop)
4185
thread.join()
86+
loop.close()

rust/perspective-python/src/client/client_async.rs

+14-14
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ use std::sync::Arc;
1717
use async_lock::RwLock;
1818
use futures::FutureExt;
1919
use perspective_client::{
20-
assert_table_api, assert_view_api, Client, OnUpdateMode, OnUpdateOptions, Table, TableData,
21-
TableInitOptions, TableReadFormat, UpdateData, UpdateOptions, View, ViewOnUpdateResp,
22-
ViewWindow,
20+
Client, OnUpdateMode, OnUpdateOptions, Table, TableData, TableInitOptions, TableReadFormat,
21+
UpdateData, UpdateOptions, View, ViewOnUpdateResp, ViewWindow, assert_table_api,
22+
assert_view_api,
2323
};
2424
use pyo3::exceptions::PyValueError;
2525
use pyo3::prelude::*;
@@ -37,16 +37,16 @@ use crate::py_err::{PyPerspectiveError, ResultTClientErrorExt};
3737
#[derive(Clone)]
3838
pub struct AsyncClient {
3939
pub(crate) client: Client,
40-
loop_cb: Arc<RwLock<Option<Py<PyAny>>>>,
41-
close_cb: Option<Py<PyAny>>,
40+
loop_cb: Arc<RwLock<Arc<Option<Py<PyAny>>>>>,
41+
close_cb: Arc<Option<Py<PyAny>>>,
4242
}
4343

4444
impl AsyncClient {
4545
pub fn new_from_client(client: Client) -> Self {
4646
AsyncClient {
4747
client,
4848
loop_cb: Arc::default(),
49-
close_cb: None,
49+
close_cb: Arc::default(),
5050
}
5151
}
5252
}
@@ -80,7 +80,7 @@ impl AsyncClient {
8080
AsyncClient {
8181
client,
8282
loop_cb: Arc::default(),
83-
close_cb: handle_close,
83+
close_cb: handle_close.into(),
8484
}
8585
}
8686

@@ -160,12 +160,12 @@ impl AsyncClient {
160160
}
161161

162162
pub async fn set_loop_callback(&self, loop_cb: Py<PyAny>) -> PyResult<()> {
163-
*self.loop_cb.write().await = Some(loop_cb);
163+
*self.loop_cb.write().await = Some(loop_cb).into();
164164
Ok(())
165165
}
166166

167167
pub fn terminate(&self, py: Python<'_>) -> PyResult<()> {
168-
if let Some(cb) = &self.close_cb {
168+
if let Some(cb) = &*self.close_cb {
169169
cb.call0(py)?;
170170
}
171171

@@ -225,12 +225,12 @@ impl AsyncTable {
225225
}
226226

227227
pub async fn on_delete(&self, callback_py: Py<PyAny>) -> PyResult<u32> {
228-
let loop_cb = self.client.loop_cb.read().await.clone();
228+
let loop_cb = (*self.client.loop_cb.read().await).clone();
229229
let callback = {
230230
let callback_py = Python::with_gil(|py| Py::clone_ref(&callback_py, py));
231231
Box::new(move || {
232232
Python::with_gil(|py| {
233-
if let Some(loop_cb) = &loop_cb {
233+
if let Some(loop_cb) = &*loop_cb {
234234
loop_cb.call1(py, (&callback_py,))?;
235235
} else {
236236
callback_py.call0(py)?;
@@ -402,7 +402,7 @@ impl AsyncView {
402402
Box::new(move || {
403403
let loop_cb = loop_cb.clone();
404404
Python::with_gil(|py| {
405-
if let Some(loop_cb) = &loop_cb {
405+
if let Some(loop_cb) = &*loop_cb {
406406
loop_cb.call1(py, (&*callback_py,))?;
407407
} else {
408408
callback_py.call0(py)?;
@@ -424,8 +424,8 @@ impl AsyncView {
424424

425425
#[pyo3(signature=(callback, mode=None))]
426426
pub async fn on_update(&self, callback: Py<PyAny>, mode: Option<String>) -> PyResult<u32> {
427-
let locked_val = self.client.loop_cb.read().await;
428-
let loop_cb = Python::with_gil(|py| locked_val.as_ref().map(|v| Py::clone_ref(v, py)));
427+
let locked_val = self.client.loop_cb.read().await.clone();
428+
let loop_cb = Python::with_gil(|py| (*locked_val).as_ref().map(|v| Py::clone_ref(v, py)));
429429
let callback = move |x: ViewOnUpdateResp| {
430430
let loop_cb = Python::with_gil(|py| loop_cb.as_ref().map(|v| Py::clone_ref(v, py)));
431431
let callback = Python::with_gil(|py| Py::clone_ref(&callback, py));

rust/perspective-python/src/server/server_sync.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub struct PySyncServer {
3535
}
3636

3737
#[derive(Clone)]
38-
struct PyConnection(Py<PyAny>);
38+
struct PyConnection(Arc<Py<PyAny>>);
3939

4040
impl SessionHandler for PyConnection {
4141
async fn send_response<'a>(
@@ -57,7 +57,7 @@ impl PySyncServer {
5757
pub fn new_session(&self, _py: Python, response_cb: Py<PyAny>) -> PySyncSession {
5858
let session = self
5959
.server
60-
.new_session(PyConnection(response_cb))
60+
.new_session(PyConnection(response_cb.into()))
6161
.block_on();
6262

6363
let session = Arc::new(RwLock::new(Some(session)));

0 commit comments

Comments
 (0)