1
1
import pandas as pd
2
+ import pyarrow as pa
2
3
import pytz
3
- from datetime import datetime , timedelta
4
+ from datetime import datetime , timedelta , date , timezone
4
5
from pandas .compat import set_function_name
5
6
from typing import Optional
6
7
@@ -40,7 +41,8 @@ def _apply_updates(
40
41
if throttle > timedelta (0 ):
41
42
csp .schedule_alarm (alarm , throttle , True )
42
43
s_has_time_col = time_col and time_col not in data .keys ()
43
- s_datetime_cols = set ([c for c , t in table .schema ().items () if t == datetime ])
44
+ s_datetime_cols = set ([c for c , t in table .schema ().items () if t == "datetime" ])
45
+ s_date_cols = set ([c for c , t in table .schema ().items () if t == "date" ])
44
46
45
47
with csp .stop ():
46
48
try :
@@ -81,14 +83,20 @@ def _apply_updates(
81
83
row [index_col ] = idx
82
84
if s_has_time_col :
83
85
if localize :
84
- row [time_col ] = pytz .utc .localize (csp .now ())
86
+ row [time_col ] = int ( pytz .utc .localize (csp .now ()). timestamp () * 1000 )
85
87
else :
86
- row [time_col ] = csp .now ()
88
+ row [time_col ] = int ( pytz . utc . localize ( csp .now ()). timestamp () * 1000 )
87
89
else :
88
90
row = new_rows [idx ]
89
91
90
- if localize and col in s_datetime_cols and value .tzinfo is None :
91
- row [col ] = pytz .utc .localize (value )
92
+ if col in s_date_cols :
93
+ row [col ] = value .replace (tzinfo = timezone .utc ).timestamp ()
94
+
95
+ elif localize and col in s_datetime_cols :
96
+ if value .tzinfo is None :
97
+ row [col ] = int (pytz .utc .localize (value ).timestamp () * 1000 )
98
+ else :
99
+ row [col ] = int (pytz .utc .localize (value ).timestamp () * 1000 )
92
100
else :
93
101
row [col ] = value
94
102
@@ -160,28 +168,41 @@ def __init__(
160
168
self ._limit = limit
161
169
self ._localize = localize
162
170
171
+ # TODO: we do not want 1 server per table, make a Client an optional
172
+ self ._psp_server = perspective .Server ()
173
+ self ._psp_client = self ._psp_server .new_local_client ()
174
+
163
175
self ._basket = _frame_to_basket (data )
164
176
self ._static_frame = data .csp .static_frame ()
165
- self ._static_table = perspective . Table (self ._static_frame )
177
+ self ._static_table = self . _psp_client . table (self ._static_frame )
166
178
static_schema = self ._static_table .schema ()
167
179
# Since the index will be accounted for separately, remove the index from the static table schema,
168
180
# and re-enter it under index_col
169
181
raw_index_name = self ._static_frame .index .name or "index"
170
182
index_type = static_schema .pop (raw_index_name )
171
183
schema = {index_col : index_type }
184
+ perspective_type_map = {
185
+ str : "string" ,
186
+ float : "float" ,
187
+ int : "integer" ,
188
+ date : "date" ,
189
+ datetime : "datetime" ,
190
+ bool : "boolean" ,
191
+ }
192
+
172
193
if time_col :
173
- schema [time_col ] = datetime
194
+ schema [time_col ] = " datetime"
174
195
for col , series in data .items ():
175
196
if is_csp_type (series ):
176
- schema [col ] = series .dtype .subtype
197
+ schema [col ] = perspective_type_map [ series .dtype .subtype ]
177
198
else :
178
199
schema [col ] = static_schema [col ]
179
200
180
201
if self ._keep_history :
181
- self ._table = perspective . Table (schema , index = None , limit = limit )
202
+ self ._table = self . _psp_client . table (schema , index = None , limit = limit )
182
203
self ._static_records = self ._static_frame .to_dict (orient = "index" )
183
204
else :
184
- self ._table = perspective . Table (schema , index = self ._index_col )
205
+ self ._table = self . _psp_client . table (schema , index = self ._index_col )
185
206
self ._static_frame .index = self ._static_frame .index .rename (self ._index_col )
186
207
self ._table .update (self ._static_frame )
187
208
self ._static_records = None # No need to update dynamically
@@ -222,7 +243,7 @@ def run_historical(self, starttime, endtime):
222
243
index = self ._index_col
223
244
if self ._limit :
224
245
df = df .sort_values (self ._time_col ).tail (self ._limit ).reset_index (drop = True )
225
- return perspective . Table ( df . to_dict ( "series" ) , index = index )
246
+ return self . _psp_client . table ( df , index = index )
226
247
227
248
def run (self , starttime = None , endtime = timedelta (seconds = 60 ), realtime = True , clear = False ):
228
249
"""Run a graph that sends data to the table on the current thread.
@@ -280,7 +301,7 @@ def get_widget(self, **override_kwargs):
280
301
"sort" : [[self ._time_col , "desc" ]],
281
302
}
282
303
else :
283
- kwargs = {"columns" : list (self ._table .schema ())}
304
+ kwargs = {"columns" : list (self ._table .columns ())}
284
305
kwargs .update (override_kwargs )
285
306
return perspective .PerspectiveWidget (self ._table , ** kwargs )
286
307
@@ -294,14 +315,33 @@ def _method(self, **options):
294
315
295
316
@classmethod
296
317
def _add_view_methods (cls ):
297
- cls .to_df = cls ._create_view_method (perspective .View .to_df )
298
- cls .to_dict = cls ._create_view_method (perspective .View .to_dict )
318
+ # cls.to_df = cls._create_view_method(perspective.View.to_df)
319
+ # cls.to_dict = cls._create_view_method(perspective.View.to_dict)
299
320
cls .to_json = cls ._create_view_method (perspective .View .to_json )
300
321
cls .to_csv = cls ._create_view_method (perspective .View .to_csv )
301
- cls .to_numpy = cls ._create_view_method (perspective .View .to_numpy )
322
+ # cls.to_numpy = cls._create_view_method(perspective.View.to_numpy)
302
323
cls .to_columns = cls ._create_view_method (perspective .View .to_columns )
303
324
cls .to_arrow = cls ._create_view_method (perspective .View .to_arrow )
304
325
326
+ def to_df (self , ** kwargs ):
327
+ ipc_bytes = self .to_arrow ()
328
+ table = pa .ipc .open_stream (ipc_bytes ).read_all ()
329
+ df = pd .DataFrame (table .to_pandas (** kwargs ))
330
+
331
+ # DAVIS: `pyarrow` does not force alphabetical order on categories, so
332
+ # we correct this here to make assertions pass. We can enforce this in
333
+ # Perspective at a performance hit/API complexity.
334
+ for column in df :
335
+ if df [column ].dtype == "datetime64[ms]" :
336
+ df [column ] = df [column ].astype ('datetime64[ns]' )
337
+ elif df [column ].dtype == "category" :
338
+ df [column ] = df [column ].cat .reorder_categories (df [column ].cat .categories .sort_values ())
339
+
340
+ if df .index .dtype == "category" :
341
+ df .index = df .index .cat .reorder_categories (df .index .cat .categories .sort_values ())
342
+
343
+ return df
344
+
305
345
306
346
CspPerspectiveTable ._add_view_methods ()
307
347
0 commit comments