Skip to content

Commit 28f9841

Browse files
committed
Upgrade CSP to Perspective 3.0.3
1 parent 1ce6f25 commit 28f9841

File tree

5 files changed

+146
-63
lines changed

5 files changed

+146
-63
lines changed

csp/adapters/perspective.py

+14-15
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,20 @@
1313
except ImportError:
1414
raise ImportError("perspective adapter requires tornado package")
1515

16-
1716
try:
18-
from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size
17+
from perspective import Server, Table as Table_, View as View_, __version__, set_threadpool_size
1918

2019
MAJOR, MINOR, PATCH = map(int, __version__.split("."))
21-
if (MAJOR, MINOR, PATCH) < (0, 6, 2):
22-
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
20+
if (MAJOR, MINOR, PATCH) < (3, 0, 3):
21+
raise ImportError("perspective adapter requires version 3.0.3 or greater of the perspective-python package")
2322
except ImportError:
24-
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
23+
raise ImportError("perspective adapter requires version 3.0.3 or greater of the perspective-python package")
2524

2625

2726
# Run perspective update in a separate tornado loop
28-
def perspective_thread(manager):
27+
def perspective_thread(client):
2928
loop = tornado.ioloop.IOLoop()
30-
manager.set_loop_callback(loop.add_callback)
29+
client.set_loop_callback(loop.add_callback)
3130
loop.start()
3231

3332

@@ -61,12 +60,13 @@ def _launch_application(port: int, manager: object, stub: ts[object]):
6160
s_iothread = None
6261

6362
with csp.start():
64-
from perspective import PerspectiveTornadoHandler
63+
from perspective import PerspectiveTornadoHandler, Server
64+
server = Server()
6565

6666
s_app = tornado.web.Application(
6767
[
6868
# create a websocket endpoint that the client Javascript can access
69-
(r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True})
69+
(r"/websocket", PerspectiveTornadoHandler, {"perspective_server": server, "check_origin": True})
7070
],
7171
websocket_ping_interval=15,
7272
)
@@ -197,20 +197,19 @@ def create_table(self, name, limit=None, index=None):
197197

198198
def _instantiate(self):
199199
set_threadpool_size(self._threadpool_size)
200+
server = Server()
201+
client = server.new_local_client()
200202

201-
manager = PerspectiveManager()
202-
203-
thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
203+
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
204204
thread.daemon = True
205205
thread.start()
206206

207207
for table_name, table in self._tables.items():
208208
schema = {
209209
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
210210
}
211-
ptable = Table(schema, limit=table.limit, index=table.index)
212-
manager.host_table(table_name, ptable)
211+
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)
213212

214213
_apply_updates(ptable, table.columns, self._throttle)
215214

216-
_launch_application(self._port, manager, csp.const("stub"))
215+
_launch_application(self._port, client, csp.const("stub"))

csp/dataframe.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import datetime, timedelta
1+
from datetime import datetime, timedelta, date
22
from typing import Dict, Optional
33

44
import csp.baselib
@@ -198,7 +198,7 @@ def to_pandas_ts(self, trigger, window, tindex=None, wait_all_valid=True):
198198

199199
return make_pandas(trigger, self._data, window, tindex, wait_all_valid)
200200

201-
def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime: bool = False):
201+
def to_perspective(self, client, starttime: datetime, endtime: datetime = None, realtime: bool = False):
202202
import csp
203203

204204
try:
@@ -240,7 +240,7 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
240240

241241
if csp.ticked(data):
242242
s_buffer.append(dict(data.tickeditems()))
243-
s_buffer[-1][timecol] = csp.now()
243+
s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000)
244244

245245
if csp.ticked(alarm):
246246
if len(s_buffer) > 0:
@@ -250,9 +250,17 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
250250
csp.schedule_alarm(alarm, throttle, True)
251251

252252
timecol = "time"
253-
schema = {k: v.tstype.typ for k, v in self._data.items()}
254-
schema[timecol] = datetime
255-
table = perspective.Table(schema)
253+
perspective_type_map = {
254+
str: "string",
255+
float: "float",
256+
int: "integer",
257+
date: "date",
258+
datetime: "datetime",
259+
bool: "boolean",
260+
}
261+
schema = {k: perspective_type_map[v.tstype.typ] for k, v in self._data.items()}
262+
schema[timecol] = "datetime"
263+
table = client.table(schema)
256264
runner = csp.run_on_thread(
257265
apply_updates,
258266
table,

csp/impl/pandas_perspective.py

+59-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pandas as pd
2+
import pyarrow as pa
23
import pytz
3-
from datetime import datetime, timedelta
4+
from datetime import datetime, timedelta, date, timezone
45
from pandas.compat import set_function_name
56
from typing import Optional
67

@@ -40,7 +41,8 @@ def _apply_updates(
4041
if throttle > timedelta(0):
4142
csp.schedule_alarm(alarm, throttle, True)
4243
s_has_time_col = time_col and time_col not in data.keys()
43-
s_datetime_cols = set([c for c, t in table.schema().items() if t == datetime])
44+
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
45+
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])
4446

4547
with csp.stop():
4648
try:
@@ -81,14 +83,23 @@ def _apply_updates(
8183
row[index_col] = idx
8284
if s_has_time_col:
8385
if localize:
84-
row[time_col] = pytz.utc.localize(csp.now())
86+
row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000)
8587
else:
86-
row[time_col] = csp.now()
88+
row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000)
8789
else:
8890
row = new_rows[idx]
8991

90-
if localize and col in s_datetime_cols and value.tzinfo is None:
91-
row[col] = pytz.utc.localize(value)
92+
if col in s_date_cols:
93+
row[col] = int(datetime(year=value.year,
94+
month=value.month,
95+
day=value.day,
96+
tzinfo=timezone.utc).timestamp() * 1000)
97+
98+
elif localize and col in s_datetime_cols:
99+
if value.tzinfo is None:
100+
row[col] = int(pytz.utc.localize(value).timestamp() * 1000)
101+
else:
102+
row[col] = int(pytz.utc.localize(value).timestamp() * 1000)
92103
else:
93104
row[col] = value
94105

@@ -160,28 +171,41 @@ def __init__(
160171
self._limit = limit
161172
self._localize = localize
162173

174+
# TODO: we do not want 1 server per table, make a Client an optional
175+
self._psp_server = perspective.Server()
176+
self._psp_client = self._psp_server.new_local_client()
177+
163178
self._basket = _frame_to_basket(data)
164179
self._static_frame = data.csp.static_frame()
165-
self._static_table = perspective.Table(self._static_frame)
180+
self._static_table = self._psp_client.table(self._static_frame)
166181
static_schema = self._static_table.schema()
167182
# Since the index will be accounted for separately, remove the index from the static table schema,
168183
# and re-enter it under index_col
169184
raw_index_name = self._static_frame.index.name or "index"
170185
index_type = static_schema.pop(raw_index_name)
171186
schema = {index_col: index_type}
187+
perspective_type_map = {
188+
str: "string",
189+
float: "float",
190+
int: "integer",
191+
date: "date",
192+
datetime: "datetime",
193+
bool: "boolean",
194+
}
195+
172196
if time_col:
173-
schema[time_col] = datetime
197+
schema[time_col] = "datetime"
174198
for col, series in data.items():
175199
if is_csp_type(series):
176-
schema[col] = series.dtype.subtype
200+
schema[col] = perspective_type_map[series.dtype.subtype]
177201
else:
178202
schema[col] = static_schema[col]
179203

180204
if self._keep_history:
181-
self._table = perspective.Table(schema, index=None, limit=limit)
205+
self._table = self._psp_client.table(schema, index=None, limit=limit)
182206
self._static_records = self._static_frame.to_dict(orient="index")
183207
else:
184-
self._table = perspective.Table(schema, index=self._index_col)
208+
self._table = self._psp_client.table(schema, index=self._index_col)
185209
self._static_frame.index = self._static_frame.index.rename(self._index_col)
186210
self._table.update(self._static_frame)
187211
self._static_records = None # No need to update dynamically
@@ -222,7 +246,7 @@ def run_historical(self, starttime, endtime):
222246
index = self._index_col
223247
if self._limit:
224248
df = df.sort_values(self._time_col).tail(self._limit).reset_index(drop=True)
225-
return perspective.Table(df.to_dict("series"), index=index)
249+
return self._psp_client.table(df, index=index)
226250

227251
def run(self, starttime=None, endtime=timedelta(seconds=60), realtime=True, clear=False):
228252
"""Run a graph that sends data to the table on the current thread.
@@ -280,7 +304,7 @@ def get_widget(self, **override_kwargs):
280304
"sort": [[self._time_col, "desc"]],
281305
}
282306
else:
283-
kwargs = {"columns": list(self._table.schema())}
307+
kwargs = {"columns": list(self._table.columns())}
284308
kwargs.update(override_kwargs)
285309
return perspective.PerspectiveWidget(self._table, **kwargs)
286310

@@ -294,14 +318,33 @@ def _method(self, **options):
294318

295319
@classmethod
296320
def _add_view_methods(cls):
297-
cls.to_df = cls._create_view_method(perspective.View.to_df)
298-
cls.to_dict = cls._create_view_method(perspective.View.to_dict)
321+
# cls.to_df = cls._create_view_method(perspective.View.to_df)
322+
# cls.to_dict = cls._create_view_method(perspective.View.to_dict)
299323
cls.to_json = cls._create_view_method(perspective.View.to_json)
300324
cls.to_csv = cls._create_view_method(perspective.View.to_csv)
301-
cls.to_numpy = cls._create_view_method(perspective.View.to_numpy)
325+
# cls.to_numpy = cls._create_view_method(perspective.View.to_numpy)
302326
cls.to_columns = cls._create_view_method(perspective.View.to_columns)
303327
cls.to_arrow = cls._create_view_method(perspective.View.to_arrow)
304328

329+
def to_df(self, **kwargs):
330+
ipc_bytes = self.to_arrow()
331+
table = pa.ipc.open_stream(ipc_bytes).read_all()
332+
df = pd.DataFrame(table.to_pandas(**kwargs))
333+
334+
# DAVIS: `pyarrow` does not force alphabetical order on categories, so
335+
# we correct this here to make assertions pass. We can enforce this in
336+
# Perspective at a performance hit/API complexity.
337+
for column in df:
338+
if df[column].dtype == "datetime64[ms]":
339+
df[column] = df[column].astype('datetime64[ns]')
340+
elif df[column].dtype == "category":
341+
df[column] = df[column].cat.reorder_categories(df[column].cat.categories.sort_values())
342+
343+
if df.index.dtype == "category":
344+
df.index = df.index.cat.reorder_categories(df.index.cat.categories.sort_values())
345+
346+
return df
347+
305348

306349
CspPerspectiveTable._add_view_methods()
307350

csp/tests/impl/test_dateframe.py csp/tests/impl/test_dataframe.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,13 @@ def test_perspective(self):
134134
starttime = datetime(2021, 4, 26)
135135
endtime = starttime + timedelta(seconds=10)
136136

137-
_ = df.to_perspective(starttime, endtime)
137+
server = perspective.Server()
138+
client = server.new_local_client()
139+
140+
_ = df.to_perspective(client, starttime, endtime)
138141

139142
# realtime
140-
widget = df.to_perspective(datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
143+
widget = df.to_perspective(client, datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
141144
import time
142145

143146
time.sleep(1)

0 commit comments

Comments
 (0)