1
1
import pandas as pd
2
+ import pyarrow as pa
2
3
import pytz
3
- from datetime import datetime , timedelta
4
+ from datetime import date , datetime , timedelta , timezone
4
5
from pandas .compat import set_function_name
5
6
from typing import Optional
6
7
@@ -40,7 +41,8 @@ def _apply_updates(
40
41
if throttle > timedelta (0 ):
41
42
csp .schedule_alarm (alarm , throttle , True )
42
43
s_has_time_col = time_col and time_col not in data .keys ()
43
- s_datetime_cols = set ([c for c , t in table .schema ().items () if t == datetime ])
44
+ s_datetime_cols = set ([c for c , t in table .schema ().items () if t == "datetime" ])
45
+ s_date_cols = set ([c for c , t in table .schema ().items () if t == "date" ])
44
46
45
47
with csp .stop ():
46
48
try :
@@ -81,14 +83,22 @@ def _apply_updates(
81
83
row [index_col ] = idx
82
84
if s_has_time_col :
83
85
if localize :
84
- row [time_col ] = pytz .utc .localize (csp .now ())
86
+ row [time_col ] = int ( pytz .utc .localize (csp .now ()). timestamp () * 1000 )
85
87
else :
86
- row [time_col ] = csp .now ()
88
+ row [time_col ] = int ( pytz . utc . localize ( csp .now ()). timestamp () * 1000 )
87
89
else :
88
90
row = new_rows [idx ]
89
91
90
- if localize and col in s_datetime_cols and value .tzinfo is None :
91
- row [col ] = pytz .utc .localize (value )
92
+ if col in s_date_cols :
93
+ row [col ] = int (
94
+ datetime (year = value .year , month = value .month , day = value .day , tzinfo = timezone .utc ).timestamp () * 1000
95
+ )
96
+
97
+ elif localize and col in s_datetime_cols :
98
+ if value .tzinfo is None :
99
+ row [col ] = int (pytz .utc .localize (value ).timestamp () * 1000 )
100
+ else :
101
+ row [col ] = int (pytz .utc .localize (value ).timestamp () * 1000 )
92
102
else :
93
103
row [col ] = value
94
104
@@ -160,28 +170,41 @@ def __init__(
160
170
self ._limit = limit
161
171
self ._localize = localize
162
172
173
+ # TODO: we do not want 1 server per table, make a Client param?
174
+ self ._psp_server = perspective .Server ()
175
+ self ._psp_client = self ._psp_server .new_local_client ()
176
+
163
177
self ._basket = _frame_to_basket (data )
164
178
self ._static_frame = data .csp .static_frame ()
165
- self ._static_table = perspective . Table (self ._static_frame )
179
+ self ._static_table = self . _psp_client . table (self ._static_frame )
166
180
static_schema = self ._static_table .schema ()
167
181
# Since the index will be accounted for separately, remove the index from the static table schema,
168
182
# and re-enter it under index_col
169
183
raw_index_name = self ._static_frame .index .name or "index"
170
184
index_type = static_schema .pop (raw_index_name )
171
185
schema = {index_col : index_type }
186
+ perspective_type_map = {
187
+ str : "string" ,
188
+ float : "float" ,
189
+ int : "integer" ,
190
+ date : "date" ,
191
+ datetime : "datetime" ,
192
+ bool : "boolean" ,
193
+ }
194
+
172
195
if time_col :
173
- schema [time_col ] = datetime
196
+ schema [time_col ] = " datetime"
174
197
for col , series in data .items ():
175
198
if is_csp_type (series ):
176
- schema [col ] = series .dtype .subtype
199
+ schema [col ] = perspective_type_map [ series .dtype .subtype ]
177
200
else :
178
201
schema [col ] = static_schema [col ]
179
202
180
203
if self ._keep_history :
181
- self ._table = perspective . Table (schema , index = None , limit = limit )
204
+ self ._table = self . _psp_client . table (schema , index = None , limit = limit )
182
205
self ._static_records = self ._static_frame .to_dict (orient = "index" )
183
206
else :
184
- self ._table = perspective . Table (schema , index = self ._index_col )
207
+ self ._table = self . _psp_client . table (schema , index = self ._index_col )
185
208
self ._static_frame .index = self ._static_frame .index .rename (self ._index_col )
186
209
self ._table .update (self ._static_frame )
187
210
self ._static_records = None # No need to update dynamically
@@ -222,7 +245,7 @@ def run_historical(self, starttime, endtime):
222
245
index = self ._index_col
223
246
if self ._limit :
224
247
df = df .sort_values (self ._time_col ).tail (self ._limit ).reset_index (drop = True )
225
- return perspective . Table ( df . to_dict ( "series" ) , index = index )
248
+ return self . _psp_client . table ( df , index = index )
226
249
227
250
def run (self , starttime = None , endtime = timedelta (seconds = 60 ), realtime = True , clear = False ):
228
251
"""Run a graph that sends data to the table on the current thread.
@@ -280,7 +303,7 @@ def get_widget(self, **override_kwargs):
280
303
"sort" : [[self ._time_col , "desc" ]],
281
304
}
282
305
else :
283
- kwargs = {"columns" : list (self ._table .schema ())}
306
+ kwargs = {"columns" : list (self ._table .columns ())}
284
307
kwargs .update (override_kwargs )
285
308
return perspective .PerspectiveWidget (self ._table , ** kwargs )
286
309
@@ -294,14 +317,30 @@ def _method(self, **options):
294
317
295
318
@classmethod
296
319
def _add_view_methods (cls ):
297
- cls .to_df = cls ._create_view_method (perspective .View .to_df )
298
- cls .to_dict = cls ._create_view_method (perspective .View .to_dict )
299
320
cls .to_json = cls ._create_view_method (perspective .View .to_json )
300
321
cls .to_csv = cls ._create_view_method (perspective .View .to_csv )
301
- cls .to_numpy = cls ._create_view_method (perspective .View .to_numpy )
302
322
cls .to_columns = cls ._create_view_method (perspective .View .to_columns )
303
323
cls .to_arrow = cls ._create_view_method (perspective .View .to_arrow )
304
324
325
+ def to_df (self , ** kwargs ):
326
+ ipc_bytes = self .to_arrow ()
327
+ table = pa .ipc .open_stream (ipc_bytes ).read_all ()
328
+ df = pd .DataFrame (table .to_pandas (** kwargs ))
329
+
330
+ # DAVIS: `pyarrow` does not force alphabetical order on categories, so
331
+ # we correct this here to make assertions pass. We can enforce this in
332
+ # Perspective at a performance hit/API complexity.
333
+ for column in df :
334
+ if df [column ].dtype == "datetime64[ms]" :
335
+ df [column ] = df [column ].astype ("datetime64[ns]" )
336
+ elif df [column ].dtype == "category" :
337
+ df [column ] = df [column ].cat .reorder_categories (df [column ].cat .categories .sort_values ())
338
+
339
+ if df .index .dtype == "category" :
340
+ df .index = df .index .cat .reorder_categories (df .index .cat .categories .sort_values ())
341
+
342
+ return df
343
+
305
344
306
345
CspPerspectiveTable ._add_view_methods ()
307
346
0 commit comments