Skip to content

Commit bd9f4a1

Browse files
committed
Fix polars support and add LargeUTF8 Arrow dictionary support
Signed-off-by: Andrew Stein <steinlink@gmail.com>
1 parent a2e5db2 commit bd9f4a1

File tree

5 files changed

+86
-27
lines changed

5 files changed

+86
-27
lines changed

cpp/perspective/src/cpp/arrow_loader.cpp

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -469,33 +469,58 @@ copy_array(
469469
}
470470
} break;
471471
case arrow::DictionaryType::type_id: {
472+
auto dictionary_type =
473+
static_cast<const arrow::DictionaryType*>(src->type().get());
474+
475+
auto value_type = dictionary_type->value_type();
476+
472477
// If there are duplicate values in the dictionary
473478
// at different indices, i.e. [0 => a, 1 => b, 2 =>
474479
// a], tables with explicit indexes on a string
475480
// column created from a dictionary array may have
476481
// duplicate primary keys.
477482
auto scol = std::static_pointer_cast<arrow::DictionaryArray>(src);
478-
std::shared_ptr<arrow::StringArray> dict =
479-
std::static_pointer_cast<arrow::StringArray>(scol->dictionary()
483+
if (value_type->id() == arrow::large_utf8()->id()) {
484+
auto dict = std::static_pointer_cast<arrow::LargeStringArray>(
485+
scol->dictionary()
480486
);
481-
const int32_t* offsets = dict->raw_value_offsets();
482-
const uint8_t* values = dict->value_data()->data();
483-
const std::uint64_t dsize = dict->length();
484487

485-
t_vocab* vocab = dest->_get_vocab();
486-
std::string elem;
488+
const uint8_t* values = dict->value_data()->data();
489+
const std::uint64_t dsize = dict->length();
490+
t_vocab* vocab = dest->_get_vocab();
491+
std::string elem;
492+
// vocab len + null bytes
493+
vocab->reserve(dict->value_data()->size() + dsize, dsize);
494+
for (std::uint64_t i = 0; i < dsize; ++i) {
495+
std::int64_t bidx = dict->value_offset(i);
496+
std::size_t es = dict->value_length(i);
497+
elem.assign(
498+
reinterpret_cast<const char*>(values) + bidx, es
499+
);
487500

488-
vocab->reserve(
489-
dict->value_data()->size() + dsize, // vocab len + null bytes
490-
dsize
491-
);
501+
vocab->get_interned(elem);
502+
}
503+
} else {
504+
auto dict = std::static_pointer_cast<arrow::StringArray>(
505+
scol->dictionary()
506+
);
492507

493-
for (std::uint64_t i = 0; i < dsize; ++i) {
494-
std::int32_t bidx = offsets[i];
495-
std::size_t es = offsets[i + 1] - bidx;
496-
elem.assign(reinterpret_cast<const char*>(values) + bidx, es);
497-
vocab->get_interned(elem);
508+
const uint8_t* values = dict->value_data()->data();
509+
const std::uint64_t dsize = dict->length();
510+
t_vocab* vocab = dest->_get_vocab();
511+
std::string elem;
512+
vocab->reserve(dict->value_data()->size() + dsize, dsize);
513+
for (std::uint64_t i = 0; i < dsize; ++i) {
514+
std::int32_t bidx = dict->value_offset(i);
515+
std::size_t es = dict->value_length(i);
516+
elem.assign(
517+
reinterpret_cast<const char*>(values) + bidx, es
518+
);
519+
520+
vocab->get_interned(elem);
521+
}
498522
}
523+
499524
auto indices = scol->indices();
500525
switch (indices->type()->id()) {
501526
case arrow::Int8Type::type_id: {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2+
# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
3+
# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
4+
# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
5+
# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
6+
# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7+
# ┃ Copyright (c) 2017, the Perspective Authors. ┃
8+
# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9+
# ┃ This file is part of the Perspective library, distributed under the terms ┃
10+
# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11+
# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12+
13+
import perspective as psp
14+
15+
client = psp.Server().new_local_client()
16+
Table = client.table
17+
18+
19+
class TestToPolars(object):
20+
def test_to_polars(self, superstore):
21+
original_tbl = Table(superstore.to_dict(orient="records"))
22+
df = original_tbl.view().to_polars()
23+
tbl = Table(df)
24+
assert tbl.size() == original_tbl.size()
25+
df2 = tbl.view().to_polars()
26+
assert df.equals(df2)

rust/perspective-python/src/client/polars.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ pub fn arrow_to_polars(py: Python<'_>, arrow: &[u8]) -> PyResult<Py<PyAny>> {
6262
Ok(polars
6363
.getattr("read_ipc_stream")?
6464
.call1((bytes,))?
65-
.call0()?
6665
.as_unbound()
6766
.clone())
6867
}

rust/perspective-python/src/server/server_sync.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ impl PySyncServer {
7070
loop_callback: Option<Py<PyAny>>,
7171
) -> PyResult<crate::client::client_sync::Client> {
7272
let client = crate::client::client_sync::Client(PyClient::new_from_client(
73-
self.server.new_local_client().clone(),
73+
self.server
74+
.new_local_client()
75+
.take()
76+
.map_err(PyValueError::new_err)?,
7477
));
7578

7679
if let Some(loop_cb) = loop_callback {

rust/perspective-server/src/local_client.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,24 +65,26 @@ impl LocalClientState {
6565
}
6666

6767
/// A [`Client`] specialized for connecting to an in-process [`Server`].
68-
pub struct LocalClient(LocalClientState);
68+
pub struct LocalClient(Option<LocalClientState>);
6969

7070
impl Deref for LocalClient {
7171
type Target = Client;
7272

7373
fn deref(&self) -> &Self::Target {
74-
self.0.get_client()
74+
self.0.as_ref().unwrap().get_client()
7575
}
7676
}
7777

7878
impl Drop for LocalClient {
7979
fn drop(&mut self) {
80-
if let Some(session) = self.0.session.get() {
81-
if session.try_read().unwrap().is_some() {
82-
tracing::error!("`Client` dropped without `Client::close`");
80+
if let Some(state) = &self.0 {
81+
if let Some(session) = state.session.get() {
82+
if session.try_read().unwrap().is_some() {
83+
tracing::error!("`Client` dropped without `Client::close`");
84+
}
85+
} else {
86+
tracing::debug!("`Session` dropped before init");
8387
}
84-
} else {
85-
tracing::debug!("`Session` dropped before init");
8688
}
8789
}
8890
}
@@ -96,14 +98,18 @@ impl LocalClient {
9698
session: Arc::default(),
9799
};
98100

99-
LocalClient(state)
101+
LocalClient(Some(state))
102+
}
103+
104+
pub fn take(mut self) -> Result<Client, &'static str> {
105+
self.0.take().map(|x| x.get_client().clone()).ok_or("Empty")
100106
}
101107

102108
/// Close this [`LocalClient`]. Dropping a [`LocalClient`] instead of
103109
/// calling [`LocalClient::close`] will result in a log error, as this
104110
/// will leak!
105111
pub async fn close(self) {
106-
if let Some(session) = self.0.session.get() {
112+
if let Some(session) = self.0.as_ref().unwrap().session.get() {
107113
session.write().await.take().unwrap().close().await
108114
} else {
109115
tracing::debug!("`Session` dropped before init");

0 commit comments

Comments
 (0)