Skip to content

Commit f285e3c

Browse files
committed
WIP Arrow demo
1 parent 3e35bbd commit f285e3c

File tree

18 files changed

+1519
-118
lines changed

18 files changed

+1519
-118
lines changed

cpp/arcticdb/arrow/arrow_utils.cpp

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,5 @@ std::shared_ptr<std::vector<sparrow::record_batch>> segment_to_arrow_data(Segmen
7171

7272
return output;
7373
}
74-
ArrowReadResult create_arrow_read_result(
75-
const VersionedItem& version,
76-
FrameAndDescriptor&& fd) {
77-
auto result = std::move(fd);
78-
auto arrow_frame = ArrowOutputFrame{segment_to_arrow_data(result.frame_), names_from_segment(result.frame_)};
79-
80-
const auto& desc_proto = result.desc_.proto();
81-
return {version, std::move(arrow_frame), desc_proto.user_meta()};
82-
}
8374

8475
} // namespace arcticdb

cpp/arcticdb/arrow/arrow_utils.hpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,6 @@ struct ArrowReadResult {
4040
arcticdb::proto::descriptors::UserDefinedMetadata user_meta_;
4141
};
4242

43-
ArrowReadResult create_arrow_read_result(
44-
const VersionedItem& version,
45-
FrameAndDescriptor&& fd);
46-
47-
4843
std::shared_ptr<std::vector<sparrow::record_batch>> segment_to_arrow_data(SegmentInMemory& segment);
4944

5045
std::vector<std::string> names_from_segment(const SegmentInMemory& segment);

cpp/arcticdb/entity/read_result.hpp

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,36 @@
1414
#include <arcticdb/entity/frame_and_descriptor.hpp>
1515
#include <arcticdb/pipeline/python_output_frame.hpp>
1616
#include <arcticdb/util/memory_tracing.hpp>
17+
#include <arcticdb/arrow/arrow_utils.hpp>
1718

1819
#include <vector>
1920

2021
namespace arcticdb {
2122

23+
// TODO: Rename the PythonOutputFrame to PandasOutputFrame
24+
using OutputFrame = std::variant<pipelines::PythonOutputFrame, ArrowOutputFrame>;
25+
2226
struct ARCTICDB_VISIBILITY_HIDDEN ReadResult {
2327
ReadResult(
2428
const VersionedItem& versioned_item,
25-
pipelines::PythonOutputFrame&& frame_data,
29+
OutputFrame&& frame_data,
30+
OutputFormat output_format,
2631
const arcticdb::proto::descriptors::NormalizationMetadata& norm_meta,
2732
const arcticdb::proto::descriptors::UserDefinedMetadata& user_meta,
2833
const arcticdb::proto::descriptors::UserDefinedMetadata& multi_key_meta,
2934
std::vector<entity::AtomKey>&& multi_keys) :
3035
item(versioned_item),
3136
frame_data(std::move(frame_data)),
37+
output_format(output_format),
3238
norm_meta(norm_meta),
3339
user_meta(user_meta),
3440
multi_key_meta(multi_key_meta),
3541
multi_keys(std::move(multi_keys)) {
3642

3743
}
3844
VersionedItem item;
39-
pipelines::PythonOutputFrame frame_data;
45+
OutputFrame frame_data;
46+
OutputFormat output_format;
4047
arcticdb::proto::descriptors::NormalizationMetadata norm_meta;
4148
arcticdb::proto::descriptors::UserDefinedMetadata user_meta;
4249
arcticdb::proto::descriptors::UserDefinedMetadata multi_key_meta;
@@ -72,11 +79,18 @@ inline ReadResult create_python_read_result(
7279
}
7380
}
7481

75-
auto python_frame = pipelines::PythonOutputFrame{result.frame_, output_format};
82+
// Is that the principal way to init a variant?
83+
auto python_frame = [&]() -> OutputFrame {
84+
if (output_format == OutputFormat::ARROW) {
85+
return ArrowOutputFrame{segment_to_arrow_data(result.frame_), names_from_segment(result.frame_)};
86+
} else {
87+
return pipelines::PythonOutputFrame{result.frame_, output_format};
88+
}
89+
}();
7690
util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__);
7791

7892
const auto& desc_proto = result.desc_.proto();
79-
return {version, std::move(python_frame), desc_proto.normalization(),
93+
return {version, std::move(python_frame), output_format, desc_proto.normalization(),
8094
desc_proto.user_meta(), desc_proto.multi_key_meta(), std::move(result.keys_)};
8195
}
8296

cpp/arcticdb/python/adapt_read_dataframe.hpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,7 @@ inline auto adapt_read_df = [](ReadResult && ret) -> py::tuple{
1717
auto pynorm = python_util::pb_to_python(ret.norm_meta);
1818
auto pyuser_meta = python_util::pb_to_python(ret.user_meta);
1919
auto multi_key_meta = python_util::pb_to_python(ret.multi_key_meta);
20-
return py::make_tuple(ret.item, std::move(ret.frame_data), pynorm, pyuser_meta, multi_key_meta, ret.multi_keys);
21-
};
22-
23-
inline auto adapt_arrow_df = [](ArrowReadResult && ret) -> py::tuple{
24-
auto pyuser_meta = python_util::pb_to_python(ret.user_meta_);
25-
return py::make_tuple(ret.versioned_item_, std::move(ret.frame_), pyuser_meta);
20+
return py::make_tuple(ret.item, std::move(ret.frame_data), ret.output_format, pynorm, pyuser_meta, multi_key_meta, ret.multi_keys);
2621
};
2722

2823
}

cpp/arcticdb/python/python_utils.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ inline py::list adapt_read_dfs(std::vector<std::variant<ReadResult, DataError>>&
251251
auto pynorm = python_util::pb_to_python(read_result.norm_meta);
252252
auto pyuser_meta = python_util::pb_to_python(read_result.user_meta);
253253
auto multi_key_meta = python_util::pb_to_python(read_result.multi_key_meta);
254-
lst.append(py::make_tuple(read_result.item, std::move(read_result.frame_data), pynorm, pyuser_meta, multi_key_meta,
254+
lst.append(py::make_tuple(read_result.item, std::move(read_result.frame_data), read_result.output_format, pynorm, pyuser_meta, multi_key_meta,
255255
read_result.multi_keys));
256256
},
257257
[&lst] (DataError& data_error) {

cpp/arcticdb/version/python_bindings.cpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -692,16 +692,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
692692
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options) {
693693
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
694694
return adapt_read_df(v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data));
695-
},
695+
},
696696
py::call_guard<SingleThreadMutexHolder>(),
697697
"Read the specified version of the dataframe from the store")
698-
.def("read_dataframe_version_arrow",
699-
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options) {
700-
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
701-
return adapt_arrow_df(v.read_dataframe_version_arrow(sid, version_query, read_query, read_options, handler_data));
702-
},
703-
py::call_guard<SingleThreadMutexHolder>(),
704-
"Read the specified version of the dataframe from the store")
705698
.def("read_index",
706699
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query){
707700
return adapt_read_df(v.read_index(sid, version_query));
@@ -757,6 +750,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
757750
vit,
758751
PythonOutputFrame{
759752
SegmentInMemory{tsd.as_stream_descriptor()}, read_options.output_format()},
753+
read_options.output_format(),
760754
tsd_proto.normalization(),
761755
tsd_proto.user_meta(),
762756
tsd_proto.multi_key_meta(),
@@ -827,6 +821,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
827821
const auto& tsd_proto = tsd.proto();
828822
ReadResult res{vit, PythonOutputFrame{
829823
SegmentInMemory{tsd.as_stream_descriptor()}, read_options.output_format()},
824+
read_options.output_format(),
830825
tsd_proto.normalization(),
831826
tsd_proto.user_meta(),
832827
tsd_proto.multi_key_meta(), {}};

cpp/arcticdb/version/version_store_api.cpp

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -868,18 +868,6 @@ ReadResult PythonVersionStore::read_dataframe_version(
868868
return create_python_read_result(opt_version_and_frame.versioned_item_, read_options.output_format(), std::move(opt_version_and_frame.frame_and_descriptor_));
869869
}
870870

871-
ArrowReadResult PythonVersionStore::read_dataframe_version_arrow(
872-
const StreamId &stream_id,
873-
const VersionQuery& version_query,
874-
const std::shared_ptr<ReadQuery>& read_query,
875-
const ReadOptions& read_options,
876-
std::any& handler_data) {
877-
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: read_dataframe_version_arrow");
878-
util::check(read_options.output_format() == OutputFormat::ARROW, "Expected arrow format in read_dataframe_version_arrow");
879-
auto opt_version_and_frame = read_dataframe_version_internal(stream_id, version_query, read_query, read_options, handler_data);
880-
return create_arrow_read_result(opt_version_and_frame.versioned_item_, std::move(opt_version_and_frame.frame_and_descriptor_));
881-
}
882-
883871
namespace {
884872

885873
std::vector<SnapshotVariantKey> ARCTICDB_UNUSED iterate_snapshot_tombstones (

cpp/arcticdb/version/version_store_api.hpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,6 @@ class PythonVersionStore : public LocalVersionedEngine {
167167
const ReadOptions& read_options,
168168
std::any& handler_data);
169169

170-
ArrowReadResult read_dataframe_version_arrow(
171-
const StreamId &stream_id,
172-
const VersionQuery& version_query,
173-
const std::shared_ptr<ReadQuery>& read_query,
174-
const ReadOptions& read_options,
175-
std::any& handler_data);
176-
177170
VersionedItem sort_merge(
178171
const StreamId& stream_id,
179172
const py::object& user_meta,

python/arcticdb/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from arcticdb.version_store._store import VersionedItem
99
import arcticdb.version_store.library as library
1010
from arcticdb.tools import set_config_from_env_vars
11-
from arcticdb_ext.version_store import DataError, VersionRequestType
11+
from arcticdb_ext.version_store import DataError, VersionRequestType, OutputFormat
1212
from arcticdb_ext.exceptions import ErrorCode, ErrorCategory
1313
from arcticdb.version_store.library import (
1414
WritePayload,

python/arcticdb/arctic.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import logging
99
from typing import List, Optional, Any, Union
1010

11-
from arcticdb.options import DEFAULT_ENCODING_VERSION, LibraryOptions, EnterpriseLibraryOptions
11+
from arcticdb.options import DEFAULT_ENCODING_VERSION, LibraryOptions, EnterpriseLibraryOptions, RuntimeOptions
1212
from arcticdb_ext.storage import LibraryManager
1313
from arcticdb.exceptions import LibraryNotFound, MismatchingLibraryOptions
1414
from arcticdb.version_store.library import ArcticInvalidApiUsageException, Library
@@ -90,7 +90,7 @@ def __init__(self, uri: str, encoding_version: EncodingVersion = DEFAULT_ENCODIN
9090
self._library_manager = LibraryManager(self._library_adapter.config_library)
9191
self._uri = uri
9292

93-
def __getitem__(self, name: str) -> Library:
93+
def _get_library(self, name:str, runtime_options:Optional[RuntimeOptions]):
9494
lib_mgr_name = self._library_adapter.get_name_for_library_manager(name)
9595
if not self._library_manager.has_library(lib_mgr_name):
9696
raise LibraryNotFound(name)
@@ -100,20 +100,24 @@ def __getitem__(self, name: str) -> Library:
100100
self._library_manager.get_library(lib_mgr_name, storage_override, native_storage_config=self._library_adapter.native_config()),
101101
repr(self._library_adapter),
102102
lib_cfg=self._library_manager.get_library_config(lib_mgr_name, storage_override),
103-
native_cfg=self._library_adapter.native_config()
103+
native_cfg=self._library_adapter.native_config(),
104+
runtime_options=runtime_options
104105
)
105106
if self._accessed_libs is not None:
106107
self._accessed_libs.append(lib)
107108
return Library(repr(self), lib)
108109

110+
def __getitem__(self, name: str) -> Library:
111+
self._get_library(name, None)
112+
109113
def __repr__(self):
110114
return "Arctic(config=%r)" % self._library_adapter
111115

112116
def __contains__(self, name: str):
113117
return self.has_library(name)
114118

115119
def get_library(
116-
self, name: str, create_if_missing: Optional[bool] = False, library_options: Optional[LibraryOptions] = None
120+
self, name: str, create_if_missing: Optional[bool] = False, library_options: Optional[LibraryOptions] = None, runtime_options: Optional[RuntimeOptions] = None
117121
) -> Library:
118122
"""
119123
Returns the library named ``name``.
@@ -136,6 +140,9 @@ def get_library(
136140
match these.
137141
Unused if create_if_missing is False.
138142
143+
runtime_options: Optional[RuntimeOptions], default = None
144+
TODO
145+
139146
Examples
140147
--------
141148
>>> arctic = adb.Arctic('s3://MY_ENDPOINT:MY_BUCKET')
@@ -152,7 +159,7 @@ def get_library(
152159
"In get_library, library_options must be falsey if create_if_missing is falsey"
153160
)
154161
try:
155-
lib = self[name]
162+
lib = self._get_library(name, runtime_options)
156163
if create_if_missing and library_options:
157164
if library_options.encoding_version is None:
158165
library_options.encoding_version = self._encoding_version

python/arcticdb/options.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from arcticdb.encoding_version import EncodingVersion
1313
from arcticdb_ext.storage import ModifiableLibraryOption, ModifiableEnterpriseLibraryOption
14+
from arcticdb_ext.version_store import OutputFormat
1415

1516

1617
DEFAULT_ENCODING_VERSION = EncodingVersion.V1
@@ -146,6 +147,15 @@ def __repr__(self):
146147
)
147148

148149

150+
class RuntimeOptions:
151+
"""
152+
TODO: Docs
153+
"""
154+
155+
def __init__(self, output_format=OutputFormat.PANDAS):
156+
self.output_format=output_format
157+
158+
149159
class EnterpriseLibraryOptions:
150160
"""
151161
Configuration options for ArcticDB libraries, that should only be used when you are using the ArcticDB enterprise

0 commit comments

Comments
 (0)