Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions cpp/perspective/src/cpp/arrow_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,33 +469,58 @@ copy_array(
}
} break;
case arrow::DictionaryType::type_id: {
auto dictionary_type =
static_cast<const arrow::DictionaryType*>(src->type().get());

auto value_type = dictionary_type->value_type();

// If there are duplicate values in the dictionary
// at different indices, i.e. [0 => a, 1 => b, 2 =>
// a], tables with explicit indexes on a string
// column created from a dictionary array may have
// duplicate primary keys.
auto scol = std::static_pointer_cast<arrow::DictionaryArray>(src);
std::shared_ptr<arrow::StringArray> dict =
std::static_pointer_cast<arrow::StringArray>(scol->dictionary()
if (value_type->id() == arrow::large_utf8()->id()) {
auto dict = std::static_pointer_cast<arrow::LargeStringArray>(
scol->dictionary()
);
const int32_t* offsets = dict->raw_value_offsets();
const uint8_t* values = dict->value_data()->data();
const std::uint64_t dsize = dict->length();

t_vocab* vocab = dest->_get_vocab();
std::string elem;
const uint8_t* values = dict->value_data()->data();
const std::uint64_t dsize = dict->length();
t_vocab* vocab = dest->_get_vocab();
std::string elem;
// vocab len + null bytes
vocab->reserve(dict->value_data()->size() + dsize, dsize);
for (std::uint64_t i = 0; i < dsize; ++i) {
std::int64_t bidx = dict->value_offset(i);
std::size_t es = dict->value_length(i);
elem.assign(
reinterpret_cast<const char*>(values) + bidx, es
);

vocab->reserve(
dict->value_data()->size() + dsize, // vocab len + null bytes
dsize
);
vocab->get_interned(elem);
}
} else {
auto dict = std::static_pointer_cast<arrow::StringArray>(
scol->dictionary()
);

for (std::uint64_t i = 0; i < dsize; ++i) {
std::int32_t bidx = offsets[i];
std::size_t es = offsets[i + 1] - bidx;
elem.assign(reinterpret_cast<const char*>(values) + bidx, es);
vocab->get_interned(elem);
const uint8_t* values = dict->value_data()->data();
const std::uint64_t dsize = dict->length();
t_vocab* vocab = dest->_get_vocab();
std::string elem;
vocab->reserve(dict->value_data()->size() + dsize, dsize);
for (std::uint64_t i = 0; i < dsize; ++i) {
std::int32_t bidx = dict->value_offset(i);
std::size_t es = dict->value_length(i);
elem.assign(
reinterpret_cast<const char*>(values) + bidx, es
);

vocab->get_interned(elem);
}
}

auto indices = scol->indices();
switch (indices->type()->id()) {
case arrow::Int8Type::type_id: {
Expand Down
26 changes: 26 additions & 0 deletions rust/perspective-python/perspective/tests/table/test_to_polars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
# ┃ Copyright (c) 2017, the Perspective Authors. ┃
# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
# ┃ This file is part of the Perspective library, distributed under the terms ┃
# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

import perspective as psp

client = psp.Server().new_local_client()
Table = client.table


class TestToPolars(object):
def test_to_polars(self, superstore):
original_tbl = Table(superstore.to_dict(orient="records"))
df = original_tbl.view().to_polars()
tbl = Table(df)
assert tbl.size() == original_tbl.size()
df2 = tbl.view().to_polars()
assert df.equals(df2)
1 change: 0 additions & 1 deletion rust/perspective-python/src/client/polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ pub fn arrow_to_polars(py: Python<'_>, arrow: &[u8]) -> PyResult<Py<PyAny>> {
Ok(polars
.getattr("read_ipc_stream")?
.call1((bytes,))?
.call0()?
.as_unbound()
.clone())
}
Expand Down
5 changes: 4 additions & 1 deletion rust/perspective-python/src/server/server_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ impl PySyncServer {
loop_callback: Option<Py<PyAny>>,
) -> PyResult<crate::client::client_sync::Client> {
let client = crate::client::client_sync::Client(PyClient::new_from_client(
self.server.new_local_client().clone(),
self.server
.new_local_client()
.take()
.map_err(PyValueError::new_err)?,
));

if let Some(loop_cb) = loop_callback {
Expand Down
24 changes: 15 additions & 9 deletions rust/perspective-server/src/local_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,26 @@ impl LocalClientState {
}

/// A [`Client`] specialized for connecting to an in-process [`Server`].
pub struct LocalClient(LocalClientState);
pub struct LocalClient(Option<LocalClientState>);

impl Deref for LocalClient {
type Target = Client;

fn deref(&self) -> &Self::Target {
self.0.get_client()
self.0.as_ref().unwrap().get_client()
}
}

impl Drop for LocalClient {
fn drop(&mut self) {
if let Some(session) = self.0.session.get() {
if session.try_read().unwrap().is_some() {
tracing::error!("`Client` dropped without `Client::close`");
if let Some(state) = &self.0 {
if let Some(session) = state.session.get() {
if session.try_read().unwrap().is_some() {
tracing::error!("`Client` dropped without `Client::close`");
}
} else {
tracing::debug!("`Session` dropped before init");
}
} else {
tracing::debug!("`Session` dropped before init");
}
}
}
Expand All @@ -96,14 +98,18 @@ impl LocalClient {
session: Arc::default(),
};

LocalClient(state)
LocalClient(Some(state))
}

pub fn take(mut self) -> Result<Client, &'static str> {
self.0.take().map(|x| x.get_client().clone()).ok_or("Empty")
}

/// Close this [`LocalClient`]. Dropping a [`LocalClient`] instead of
/// calling [`LocalClient::close`] will result in a log error, as this
/// will leak!
pub async fn close(self) {
if let Some(session) = self.0.session.get() {
if let Some(session) = self.0.as_ref().unwrap().session.get() {
session.write().await.take().unwrap().close().await
} else {
tracing::debug!("`Session` dropped before init");
Expand Down
Loading