Skip to content

Commit a963e08

Browse files
committed
Upgrade CSP to Perspective 3.0.3
1 parent 1ce6f25 commit a963e08

File tree

5 files changed

+149
-63
lines changed

5 files changed

+149
-63
lines changed

csp/adapters/perspective.py

+15-15
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,20 @@
1313
except ImportError:
1414
raise ImportError("perspective adapter requires tornado package")
1515

16-
1716
try:
18-
from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size
17+
from perspective import Server, Table as Table_, View as View_, __version__, set_threadpool_size
1918

2019
MAJOR, MINOR, PATCH = map(int, __version__.split("."))
21-
if (MAJOR, MINOR, PATCH) < (0, 6, 2):
22-
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
20+
if (MAJOR, MINOR, PATCH) < (3, 0, 3):
21+
raise ImportError("perspective adapter requires version 3.0.3 or greater of the perspective-python package")
2322
except ImportError:
24-
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
23+
raise ImportError("perspective adapter requires version 3.0.3 or greater of the perspective-python package")
2524

2625

2726
# Run perspective update in a separate tornado loop
28-
def perspective_thread(manager):
27+
def perspective_thread(client):
2928
loop = tornado.ioloop.IOLoop()
30-
manager.set_loop_callback(loop.add_callback)
29+
client.set_loop_callback(loop.add_callback)
3130
loop.start()
3231

3332

@@ -61,12 +60,14 @@ def _launch_application(port: int, manager: object, stub: ts[object]):
6160
s_iothread = None
6261

6362
with csp.start():
64-
from perspective import PerspectiveTornadoHandler
63+
from perspective import PerspectiveTornadoHandler, Server
64+
65+
server = Server()
6566

6667
s_app = tornado.web.Application(
6768
[
6869
# create a websocket endpoint that the client Javascript can access
69-
(r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True})
70+
(r"/websocket", PerspectiveTornadoHandler, {"perspective_server": server, "check_origin": True})
7071
],
7172
websocket_ping_interval=15,
7273
)
@@ -197,20 +198,19 @@ def create_table(self, name, limit=None, index=None):
197198

198199
def _instantiate(self):
199200
set_threadpool_size(self._threadpool_size)
201+
server = Server()
202+
client = server.new_local_client()
200203

201-
manager = PerspectiveManager()
202-
203-
thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
204+
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
204205
thread.daemon = True
205206
thread.start()
206207

207208
for table_name, table in self._tables.items():
208209
schema = {
209210
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
210211
}
211-
ptable = Table(schema, limit=table.limit, index=table.index)
212-
manager.host_table(table_name, ptable)
212+
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)
213213

214214
_apply_updates(ptable, table.columns, self._throttle)
215215

216-
_launch_application(self._port, manager, csp.const("stub"))
216+
_launch_application(self._port, client, csp.const("stub"))

csp/dataframe.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import datetime, timedelta
1+
from datetime import date, datetime, timedelta
22
from typing import Dict, Optional
33

44
import csp.baselib
@@ -198,7 +198,7 @@ def to_pandas_ts(self, trigger, window, tindex=None, wait_all_valid=True):
198198

199199
return make_pandas(trigger, self._data, window, tindex, wait_all_valid)
200200

201-
def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime: bool = False):
201+
def to_perspective(self, client, starttime: datetime, endtime: datetime = None, realtime: bool = False):
202202
import csp
203203

204204
try:
@@ -240,7 +240,7 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
240240

241241
if csp.ticked(data):
242242
s_buffer.append(dict(data.tickeditems()))
243-
s_buffer[-1][timecol] = csp.now()
243+
s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000)
244244

245245
if csp.ticked(alarm):
246246
if len(s_buffer) > 0:
@@ -250,9 +250,17 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
250250
csp.schedule_alarm(alarm, throttle, True)
251251

252252
timecol = "time"
253-
schema = {k: v.tstype.typ for k, v in self._data.items()}
254-
schema[timecol] = datetime
255-
table = perspective.Table(schema)
253+
perspective_type_map = {
254+
str: "string",
255+
float: "float",
256+
int: "integer",
257+
date: "date",
258+
datetime: "datetime",
259+
bool: "boolean",
260+
}
261+
schema = {k: perspective_type_map[v.tstype.typ] for k, v in self._data.items()}
262+
schema[timecol] = "datetime"
263+
table = client.table(schema)
256264
runner = csp.run_on_thread(
257265
apply_updates,
258266
table,

csp/impl/pandas_perspective.py

+58-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pandas as pd
2+
import pyarrow as pa
23
import pytz
3-
from datetime import datetime, timedelta
4+
from datetime import date, datetime, timedelta, timezone
45
from pandas.compat import set_function_name
56
from typing import Optional
67

@@ -40,7 +41,8 @@ def _apply_updates(
4041
if throttle > timedelta(0):
4142
csp.schedule_alarm(alarm, throttle, True)
4243
s_has_time_col = time_col and time_col not in data.keys()
43-
s_datetime_cols = set([c for c, t in table.schema().items() if t == datetime])
44+
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
45+
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])
4446

4547
with csp.stop():
4648
try:
@@ -81,14 +83,22 @@ def _apply_updates(
8183
row[index_col] = idx
8284
if s_has_time_col:
8385
if localize:
84-
row[time_col] = pytz.utc.localize(csp.now())
86+
row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000)
8587
else:
86-
row[time_col] = csp.now()
88+
row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000)
8789
else:
8890
row = new_rows[idx]
8991

90-
if localize and col in s_datetime_cols and value.tzinfo is None:
91-
row[col] = pytz.utc.localize(value)
92+
if col in s_date_cols:
93+
row[col] = int(
94+
datetime(year=value.year, month=value.month, day=value.day, tzinfo=timezone.utc).timestamp() * 1000
95+
)
96+
97+
elif localize and col in s_datetime_cols:
98+
if value.tzinfo is None:
99+
row[col] = int(pytz.utc.localize(value).timestamp() * 1000)
100+
else:
101+
row[col] = int(pytz.utc.localize(value).timestamp() * 1000)
92102
else:
93103
row[col] = value
94104

@@ -160,28 +170,41 @@ def __init__(
160170
self._limit = limit
161171
self._localize = localize
162172

173+
# TODO: we do not want 1 server per table, make a Client an optional
174+
self._psp_server = perspective.Server()
175+
self._psp_client = self._psp_server.new_local_client()
176+
163177
self._basket = _frame_to_basket(data)
164178
self._static_frame = data.csp.static_frame()
165-
self._static_table = perspective.Table(self._static_frame)
179+
self._static_table = self._psp_client.table(self._static_frame)
166180
static_schema = self._static_table.schema()
167181
# Since the index will be accounted for separately, remove the index from the static table schema,
168182
# and re-enter it under index_col
169183
raw_index_name = self._static_frame.index.name or "index"
170184
index_type = static_schema.pop(raw_index_name)
171185
schema = {index_col: index_type}
186+
perspective_type_map = {
187+
str: "string",
188+
float: "float",
189+
int: "integer",
190+
date: "date",
191+
datetime: "datetime",
192+
bool: "boolean",
193+
}
194+
172195
if time_col:
173-
schema[time_col] = datetime
196+
schema[time_col] = "datetime"
174197
for col, series in data.items():
175198
if is_csp_type(series):
176-
schema[col] = series.dtype.subtype
199+
schema[col] = perspective_type_map[series.dtype.subtype]
177200
else:
178201
schema[col] = static_schema[col]
179202

180203
if self._keep_history:
181-
self._table = perspective.Table(schema, index=None, limit=limit)
204+
self._table = self._psp_client.table(schema, index=None, limit=limit)
182205
self._static_records = self._static_frame.to_dict(orient="index")
183206
else:
184-
self._table = perspective.Table(schema, index=self._index_col)
207+
self._table = self._psp_client.table(schema, index=self._index_col)
185208
self._static_frame.index = self._static_frame.index.rename(self._index_col)
186209
self._table.update(self._static_frame)
187210
self._static_records = None # No need to update dynamically
@@ -222,7 +245,7 @@ def run_historical(self, starttime, endtime):
222245
index = self._index_col
223246
if self._limit:
224247
df = df.sort_values(self._time_col).tail(self._limit).reset_index(drop=True)
225-
return perspective.Table(df.to_dict("series"), index=index)
248+
return self._psp_client.table(df, index=index)
226249

227250
def run(self, starttime=None, endtime=timedelta(seconds=60), realtime=True, clear=False):
228251
"""Run a graph that sends data to the table on the current thread.
@@ -280,7 +303,7 @@ def get_widget(self, **override_kwargs):
280303
"sort": [[self._time_col, "desc"]],
281304
}
282305
else:
283-
kwargs = {"columns": list(self._table.schema())}
306+
kwargs = {"columns": list(self._table.columns())}
284307
kwargs.update(override_kwargs)
285308
return perspective.PerspectiveWidget(self._table, **kwargs)
286309

@@ -294,14 +317,33 @@ def _method(self, **options):
294317

295318
@classmethod
296319
def _add_view_methods(cls):
297-
cls.to_df = cls._create_view_method(perspective.View.to_df)
298-
cls.to_dict = cls._create_view_method(perspective.View.to_dict)
320+
# cls.to_df = cls._create_view_method(perspective.View.to_df)
321+
# cls.to_dict = cls._create_view_method(perspective.View.to_dict)
299322
cls.to_json = cls._create_view_method(perspective.View.to_json)
300323
cls.to_csv = cls._create_view_method(perspective.View.to_csv)
301-
cls.to_numpy = cls._create_view_method(perspective.View.to_numpy)
324+
# cls.to_numpy = cls._create_view_method(perspective.View.to_numpy)
302325
cls.to_columns = cls._create_view_method(perspective.View.to_columns)
303326
cls.to_arrow = cls._create_view_method(perspective.View.to_arrow)
304327

328+
def to_df(self, **kwargs):
329+
ipc_bytes = self.to_arrow()
330+
table = pa.ipc.open_stream(ipc_bytes).read_all()
331+
df = pd.DataFrame(table.to_pandas(**kwargs))
332+
333+
# DAVIS: `pyarrow` does not force alphabetical order on categories, so
334+
# we correct this here to make assertions pass. We can enforce this in
335+
# Perspective at a performance hit/API complexity.
336+
for column in df:
337+
if df[column].dtype == "datetime64[ms]":
338+
df[column] = df[column].astype("datetime64[ns]")
339+
elif df[column].dtype == "category":
340+
df[column] = df[column].cat.reorder_categories(df[column].cat.categories.sort_values())
341+
342+
if df.index.dtype == "category":
343+
df.index = df.index.cat.reorder_categories(df.index.cat.categories.sort_values())
344+
345+
return df
346+
305347

306348
CspPerspectiveTable._add_view_methods()
307349

csp/tests/impl/test_dateframe.py csp/tests/impl/test_dataframe.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,13 @@ def test_perspective(self):
134134
starttime = datetime(2021, 4, 26)
135135
endtime = starttime + timedelta(seconds=10)
136136

137-
_ = df.to_perspective(starttime, endtime)
137+
server = perspective.Server()
138+
client = server.new_local_client()
139+
140+
_ = df.to_perspective(client, starttime, endtime)
138141

139142
# realtime
140-
widget = df.to_perspective(datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
143+
widget = df.to_perspective(client, datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
141144
import time
142145

143146
time.sleep(1)

0 commit comments

Comments
 (0)