Skip to content

Commit 6b2ae30

Browse files
authored
Merge branch 'master' into memray_head_tail_tests
2 parents 76d78de + 97a70dd commit 6b2ae30

File tree

12 files changed

+95
-66
lines changed

12 files changed

+95
-66
lines changed

cpp/arcticdb/codec/lz4.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ struct Lz4BlockEncoder {
5050
util::check_arg(compressed_bytes > 0 || (compressed_bytes == 0 && block_utils.bytes_ == 0),
5151
"expected compressed bytes >= 0, actual {}",
5252
compressed_bytes);
53-
ARCTICDB_TRACE(log::storage(), "Block of size {} compressed to {} bytes", block_utils.bytes_, compressed_bytes);
53+
ARCTICDB_TRACE(log::storage(), "Block of size {} compressed to {} bytes: {}", block_utils.bytes_, compressed_bytes, dump_bytes(out, compressed_bytes, 10U));
5454
hasher(in, block_utils.count_);
5555
pos += ssize_t(compressed_bytes);
5656
copy_codec(*out_codec.mutable_lz4(), opts);

cpp/arcticdb/codec/segment.cpp

+30-29
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ arcticdb::proto::encoding::SegmentHeader generate_v1_header(const SegmentHeader&
3030
segment_header.set_compacted(header.compacted());
3131
segment_header.set_encoding_version(static_cast<uint16_t>(header.encoding_version()));
3232

33-
ARCTICDB_TRACE(log::codec(), "Encoded segment header {}", segment_header.DebugString());
33+
ARCTICDB_TRACE(log::codec(), "Encoded segment header bytes {}: {}", segment_header.ByteSizeLong(), segment_header.DebugString());
3434
return segment_header;
3535
}
3636

@@ -153,7 +153,8 @@ DeserializedSegmentData decode_header_and_fields(const uint8_t*& src, bool copy_
153153
auto segment_header = deserialize_segment_header_from_proto(proto_wrapper->proto());
154154
util::check(segment_header.encoding_version() == EncodingVersion::V1, "Expected v1 header to contain legacy encoding version");
155155
auto fields = std::make_shared<FieldCollection>(field_collection_from_proto(proto_wrapper->proto().stream_descriptor().fields()));
156-
src += FIXED_HEADER_SIZE + fixed_hdr->header_bytes;
156+
const auto total_header_size = FIXED_HEADER_SIZE + fixed_hdr->header_bytes;
157+
src += total_header_size;
157158
auto stream_id = stream_id_from_proto(proto_wrapper->proto().stream_descriptor());
158159
return {std::move(segment_header), std::move(fields), std::move(data), std::move(proto_wrapper), stream_id};
159160
} else {
@@ -239,12 +240,12 @@ Segment Segment::from_buffer(const std::shared_ptr<Buffer>& buffer) {
239240
return{std::move(seg_hdr), buffer, std::move(desc_data), std::move(fields), stream_id, readable_size};
240241
}
241242

242-
size_t Segment::write_proto_header(uint8_t* dst) {
243+
size_t Segment::write_proto_header(uint8_t* dst, size_t header_size) {
243244
const auto& header = generate_header_proto();
244245
const auto hdr_size = proto_size();
245246
FixedHeader hdr = {MAGIC_NUMBER, HEADER_VERSION_V1, std::uint32_t(hdr_size)};
246247
write_fixed_header(dst, hdr);
247-
248+
util::check(header_size == proto_size(), "Header size mismatch: {} != {}", header_size, proto_size());
248249
google::protobuf::io::ArrayOutputStream aos(dst + FIXED_HEADER_SIZE, static_cast<int>(hdr_size));
249250
header.SerializeToZeroCopyStream(&aos);
250251
return hdr_size;
@@ -269,12 +270,18 @@ std::pair<uint8_t*, size_t> Segment::serialize_header_v2(size_t expected_bytes)
269270
return std::make_pair(buffer->preamble(), calculate_size());
270271
}
271272

272-
std::pair<uint8_t*, size_t> Segment::serialize_v1_header_in_place(size_t total_hdr_size) {
273+
std::pair<uint8_t*, size_t> Segment::serialize_v1_header_in_place(size_t hdr_size) {
274+
const auto total_hdr_size = hdr_size + FIXED_HEADER_SIZE;
273275
const auto &buffer = buffer_.get_owning_buffer();
274276
auto base_ptr = buffer->preamble() + (buffer->preamble_bytes() - total_hdr_size);
277+
util::check(buffer->data() != nullptr, "Unexpected null base pointer in v1 header serialization");
275278
util::check(base_ptr + total_hdr_size == buffer->data(), "Expected base ptr to align with data ptr, {} != {}",fmt::ptr(base_ptr + total_hdr_size),fmt::ptr(buffer->data()));
276-
write_proto_header(base_ptr);
277-
ARCTICDB_TRACE(log::storage(), "Header fits in internal buffer {:x} with {} bytes space: {}", intptr_t (base_ptr), buffer->preamble_bytes() - total_hdr_size,dump_bytes(buffer->data(), buffer->bytes(), 100u));
279+
auto red_zone = *buffer->data();
280+
auto header_bytes_written = write_proto_header(base_ptr, hdr_size);
281+
ARCTICDB_TRACE(log::storage(), "Header fits in internal buffer {:x} with {} bytes space: {}", intptr_t (base_ptr), buffer->preamble_bytes() - total_hdr_size,dump_bytes(buffer->data(), buffer->bytes(), 10u));
282+
auto check_red_zone = *buffer->data();
283+
util::check(red_zone == check_red_zone, "Data overwrite occurred {} != {}", check_red_zone, red_zone);
284+
util::check(header_bytes_written == hdr_size, "Wrote unexpected number of header bytes {} != {}", header_bytes_written, total_hdr_size);
278285
return std::make_pair(base_ptr, calculate_size());
279286
}
280287

@@ -284,46 +291,38 @@ std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_v1_head
284291
auto offset = FIXED_HEADER_SIZE + hdr_size;
285292

286293
auto total_size = offset + bytes_to_copy;
287-
288-
// Verify we have enough space for everything
289294
tmp->ensure(total_size);
290-
util::check(tmp->available() >= total_size,
291-
"Buffer available space {} is less than required size {}",
292-
tmp->available(),
293-
total_size);
294295

295-
// This is both a sanity check and a way to populate the segment with the correct size
296+
util::check(tmp->available() >= total_size, "Buffer available space {} is less than required size {}",tmp->available(), total_size);
297+
296298
auto calculated_size = calculate_size();
297299
util::check(total_size == calculated_size, "Expected total size {} to be equal to calculated size {}", total_size, calculated_size);
298300

299301
auto* dst = tmp->preamble();
300302
util::check(dst != nullptr, "Expected dst to be non-null");
303+
auto header_bytes_written = write_proto_header(dst, hdr_size);
301304

302-
auto written_hdr_size = write_proto_header(dst);
303-
util::check(written_hdr_size == hdr_size, "Expected written header size {} to be equal to expected header size {}", written_hdr_size, hdr_size);
305+
// This is a bit redundant since the size is also checked in write_proto_header, but the consequences of getting
306+
// it wrong are pretty bad (corrupt data) so will leave it in for future-proofing
307+
util::check(header_bytes_written == hdr_size, "Expected written header size {} to be equal to expected header size {}", header_bytes_written, hdr_size);
304308

305-
auto *final_dst = dst + offset;
306-
307-
auto *src = buffer().data();
308-
if (src != nullptr) {
309-
std::memcpy(final_dst,
310-
src,
311-
bytes_to_copy);
309+
if(buffer().data() != nullptr) {
310+
std::memcpy(dst + offset, buffer().data(), buffer().bytes());
312311
} else {
313312
util::check(bytes_to_copy == 0, "Expected bytes_to_copy to be 0 when src is nullptr");
314313
ARCTICDB_DEBUG(log::codec(), "src is nullptr, skipping memcpy");
315314
}
316315

317-
return std::make_tuple(dst, total_size, std::move(tmp));
316+
return std::make_tuple(tmp->preamble(), calculate_size(), std::move(tmp));
318317
}
319318

320319
std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_header_v1() {
321-
auto proto_header = generate_v1_header(header_, desc_);
322-
const auto hdr_size = proto_header.ByteSizeLong();
320+
auto proto_header = generate_header_proto();
321+
const auto hdr_size = proto_size();
323322
auto total_hdr_size = hdr_size + FIXED_HEADER_SIZE;
324323

325324
if (buffer_.is_owning_buffer() && buffer_.preamble_bytes() >= total_hdr_size) {
326-
auto [dst, size] = serialize_v1_header_in_place(total_hdr_size);
325+
auto [dst, size] = serialize_v1_header_in_place(hdr_size);
327326
return std::make_tuple(dst, size, std::unique_ptr<Buffer>());
328327
} else {
329328
return serialize_v1_header_to_buffer(hdr_size);
@@ -352,8 +351,10 @@ std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_header(
352351
}
353352

354353
const arcticdb::proto::encoding::SegmentHeader& Segment::generate_header_proto() {
355-
if(!proto_)
354+
if(!proto_) {
356355
proto_ = std::make_unique<arcticdb::proto::encoding::SegmentHeader>(generate_v1_header(header_, desc_));
356+
proto_size_ = proto_->ByteSizeLong();
357+
}
357358

358359
return *proto_;
359360
}
@@ -364,7 +365,7 @@ void Segment::write_to(std::uint8_t* dst) {
364365

365366
size_t header_size;
366367
if(header_.encoding_version() == EncodingVersion::V1)
367-
header_size = write_proto_header(dst);
368+
header_size = write_proto_header(dst, proto_size());
368369
else
369370
header_size = write_binary_header(dst);
370371

cpp/arcticdb/codec/segment.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class Segment {
143143

144144
std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> serialize_header();
145145

146-
size_t write_proto_header(uint8_t* dst);
146+
size_t write_proto_header(uint8_t* dst, size_t total_header_size);
147147

148148
[[nodiscard]] std::size_t size() const {
149149
util::check(size_.has_value(), "Segment size has not been set");
@@ -160,8 +160,7 @@ class Segment {
160160

161161
[[nodiscard]] size_t proto_size() {
162162
util::check(static_cast<bool>(proto_), "Proto has not been generated");
163-
164-
return proto_->ByteSizeLong();
163+
return proto_size_;
165164
}
166165

167166
[[nodiscard]] std::size_t segment_header_bytes_size() {
@@ -263,6 +262,7 @@ class Segment {
263262
StreamDescriptor desc_;
264263
std::any keepalive_;
265264
std::unique_ptr<arcticdb::proto::encoding::SegmentHeader> proto_;
265+
size_t proto_size_ = 0UL;
266266
std::optional<size_t> size_;
267267
};
268268

cpp/arcticdb/column_store/block.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ struct MemBlock {
6262
util::check(is_external(), "Cannot free inline allocated block");
6363
if(external_data_ != nullptr) {
6464
log::version().warn("Unexpected release of detachable block memory");
65-
free(reinterpret_cast<void *>(external_data_));
65+
delete[] external_data_;
6666
}
6767
}
6868
}

cpp/arcticdb/log/log.hpp

-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <memory>
1111

1212
#include <spdlog/spdlog.h>
13-
1413
#ifdef DEBUG_BUILD
1514
#define ARCTICDB_DEBUG(logger, ...) logger.debug(__VA_ARGS__)
1615
#define ARCTICDB_TRACE(logger, ...) logger.trace(__VA_ARGS__)

cpp/arcticdb/pipeline/read_frame.cpp

+22-17
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ size_t get_index_field_count(const SegmentInMemory& frame) {
142142

143143
const uint8_t* skip_heading_fields(const SegmentHeader & hdr, const uint8_t*& data) {
144144
const auto has_magic_numbers = hdr.encoding_version() == EncodingVersion::V2;
145+
const auto start [[maybe_unused]] = data;
145146
if(has_magic_numbers)
146147
util::check_magic<MetadataMagic>(data);
147148

@@ -172,6 +173,7 @@ const uint8_t* skip_heading_fields(const SegmentHeader & hdr, const uint8_t*& da
172173
ARCTICDB_DEBUG(log::version(), "Skipping {} bytes of index descriptor", index_fields_size);
173174
data += index_fields_size;
174175
}
176+
ARCTICDB_DEBUG(log::version(), "Skip header fields skipped {} bytes", data - start);
175177
return data;
176178
}
177179

@@ -486,14 +488,16 @@ void check_data_left_for_subsequent_fields(
486488
}
487489

488490
void decode_into_frame_static(
489-
SegmentInMemory &frame,
490-
PipelineContextRow &context,
491-
const Segment& seg,
492-
const DecodePathData& shared_data,
493-
std::any& handler_data,
494-
const ReadQuery& read_query,
495-
const ReadOptions& read_options) {
491+
SegmentInMemory &frame,
492+
PipelineContextRow &context,
493+
const storage::KeySegmentPair& key_seg,
494+
const DecodePathData& shared_data,
495+
std::any& handler_data,
496+
const ReadQuery& read_query,
497+
const ReadOptions& read_options) {
496498
ARCTICDB_SAMPLE_DEFAULT(DecodeIntoFrame)
499+
ARCTICDB_DEBUG(log::version(), "Statically decoding segment with key {}", key_seg.atom_key());
500+
const auto& seg = key_seg.segment();
497501
const uint8_t *data = seg.buffer().data();
498502
const uint8_t *begin = data;
499503
const uint8_t *end = begin + seg.buffer().bytes();
@@ -623,15 +627,16 @@ void handle_type_promotion(
623627
}
624628

625629
void decode_into_frame_dynamic(
626-
SegmentInMemory& frame,
627-
PipelineContextRow& context,
628-
const Segment& seg,
629-
const DecodePathData& shared_data,
630-
std::any& handler_data,
631-
const ReadQuery& read_query,
632-
const ReadOptions& read_options
633-
) {
630+
SegmentInMemory& frame,
631+
PipelineContextRow& context,
632+
const storage::KeySegmentPair& key_seg,
633+
const DecodePathData& shared_data,
634+
std::any& handler_data,
635+
const ReadQuery& read_query,
636+
const ReadOptions& read_options) {
634637
ARCTICDB_SAMPLE_DEFAULT(DecodeIntoFrame)
638+
ARCTICDB_DEBUG(log::version(), "Dynamically decoding segment with key {}", key_seg.atom_key());
639+
const auto& seg = key_seg.segment();
635640
const uint8_t *data = seg.buffer().data();
636641
const uint8_t *begin = data;
637642
const uint8_t *end = begin + seg.buffer().bytes();
@@ -885,9 +890,9 @@ folly::Future<SegmentInMemory> fetch_data(
885890
[row=row, frame=frame, dynamic_schema=dynamic_schema, shared_data, &handler_data, read_query, read_options](auto &&ks) mutable {
886891
auto key_seg = std::forward<storage::KeySegmentPair>(ks);
887892
if(dynamic_schema) {
888-
decode_into_frame_dynamic(frame, row, key_seg.segment(), shared_data, handler_data, read_query, read_options);
893+
decode_into_frame_dynamic(frame, row, key_seg, shared_data, handler_data, read_query, read_options);
889894
} else {
890-
decode_into_frame_static(frame, row, key_seg.segment(), shared_data, handler_data, read_query, read_options);
895+
decode_into_frame_static(frame, row, key_seg, shared_data, handler_data, read_query, read_options);
891896
}
892897

893898
return key_seg.variant_key();

cpp/arcticdb/pipeline/read_frame.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ folly::Future<SegmentInMemory> fetch_data(
8585
void decode_into_frame_static(
8686
SegmentInMemory &frame,
8787
PipelineContextRow &context,
88-
const Segment& seg,
88+
const storage::KeySegmentPair& key_seg,
8989
const DecodePathData& shared_data,
9090
std::any& handler_data,
9191
const ReadQuery& read_query,
@@ -94,7 +94,7 @@ void decode_into_frame_static(
9494
void decode_into_frame_dynamic(
9595
const SegmentInMemory &frame,
9696
PipelineContextRow &context,
97-
const Segment& seg,
97+
const storage::KeySegmentPair& key_seg,
9898
const DecodePathData& shared_data,
9999
std::any& handler_data,
100100
const ReadQuery& read_query,

cpp/arcticdb/storage/s3/s3_client_impl.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ S3Result<std::monostate> S3ClientImpl::put_object(
188188
request.SetIfNoneMatch("*");
189189
}
190190
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Set s3 key {}", request.GetKey().c_str());
191-
192191
auto [dst, write_size, buffer] = segment.serialize_header();
192+
193193
auto body = std::make_shared<boost::interprocess::bufferstream>(reinterpret_cast<char *>(dst), write_size);
194194
util::check(body->good(), "Overflow of bufferstream with size {}", write_size);
195195
request.SetBody(body);

cpp/arcticdb/storage/s3/s3_client_wrapper.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ folly::Future<S3Result<std::monostate>> S3ClientTestWrapper::delete_object(
138138

139139
// Using a fixed page size since it's only being used for simple tests.
140140
// If we ever need to configure it we should move it to the s3 proto config instead.
141-
constexpr auto page_size = 10;
142141
S3Result<ListObjectsOutput> S3ClientTestWrapper::list_objects(
143142
const std::string& name_prefix,
144143
const std::string& bucket_name,

cpp/arcticdb/util/buffer.hpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ struct Buffer : public BaseBuffer<Buffer, true> {
8888
}
8989

9090
void set_preamble(size_t pos) {
91+
util::check(preamble_bytes_ == 0, "Cannot reset buffer preabmle");
9192
util::check(pos <= capacity_, "Can't set preamble past the end of the buffer");
9293
preamble_bytes_ = pos;
9394
ptr_ += pos;
@@ -177,7 +178,7 @@ struct Buffer : public BaseBuffer<Buffer, true> {
177178
resize(total_size);
178179
} else {
179180
ARCTICDB_TRACE(log::version(), "Buffer {} has sufficient bytes for {}, ptr {} data {}, capacity {}",
180-
uintptr_t(this), bytes, uintptr_t(ptr_), uintptr_t(data_), capacity_, body_bytes_);
181+
uintptr_t(this), bytes, uintptr_t(ptr_), uintptr_t(data_), capacity_);
181182
}
182183

183184
body_bytes_ = bytes;
@@ -239,7 +240,7 @@ struct Buffer : public BaseBuffer<Buffer, true> {
239240
body_bytes_ = bytes;
240241
capacity_ = body_bytes_ + preamble_bytes_;
241242
ARCTICDB_TRACE(log::version(), "Buffer {} did realloc for {}, ptr {} data {}, capacity {}",
242-
uintptr_t(this), bytes, uintptr_t(ptr_), uintptr_t(data_), capacity_, body_bytes_);
243+
uintptr_t(this), bytes, uintptr_t(ptr_), uintptr_t(data_), capacity_);
243244
} else {
244245
throw std::bad_alloc();
245246
}

cpp/arcticdb/util/dump_bytes.hpp

+11-8
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,25 @@
1212

1313
// based on this: https://codereview.stackexchange.com/questions/165120/printing-hex-dumps-for-diagnostics
1414
namespace arcticdb {
15-
inline std::ostream &hex_dump(std::ostream &os, const void *buffer,
16-
std::size_t buf_size, bool show_printable_chars = true) {
17-
if (buffer == nullptr) {
15+
inline std::ostream &hex_dump(
16+
std::ostream &os, const void *buffer,
17+
std::size_t buf_size,
18+
bool show_printable_chars = true) {
19+
if (buffer == nullptr)
1820
return os;
19-
}
21+
2022
auto old_format = os.flags();
2123
auto old_fill_char = os.fill();
2224
constexpr std::size_t max_line{8};
25+
2326
// create a place to store text version of string
2427
char render_string[max_line + 1];
2528
char *rsptr{render_string};
29+
2630
// convenience cast
2731
const unsigned char *buf{reinterpret_cast<const unsigned char *>(buffer)};
28-
2932
for (std::size_t line_count = max_line; buf_size; --buf_size, ++buf) {
30-
os << std::setw(2) << std::setfill('0') << std::hex
31-
<< static_cast<unsigned>(*buf) << ' ';
33+
os << std::setw(2) << std::setfill('0') << std::hex << static_cast<unsigned>(*buf) << ' ';
3234
*rsptr++ = std::isprint(*buf) ? *buf : '.';
3335
if (--line_count == 0) {
3436
*rsptr++ = '\0'; // terminate string
@@ -40,6 +42,7 @@ inline std::ostream &hex_dump(std::ostream &os, const void *buffer,
4042
line_count = std::min(max_line, buf_size);
4143
}
4244
}
45+
4346
// emit newline if we haven't already
4447
if (rsptr != render_string) {
4548
if (show_printable_chars) {
@@ -56,7 +59,7 @@ inline std::ostream &hex_dump(std::ostream &os, const void *buffer,
5659
return os;
5760
}
5861

59-
inline std::string dump_bytes(const void* data, size_t size, size_t max_size = 20u) {
62+
[[nodiscard]] inline std::string dump_bytes(const void* data, size_t size, size_t max_size = 20u) {
6063
const auto sz = std::min(max_size, size);
6164
std::ostringstream strm;
6265
hex_dump(strm, data, sz);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import numpy as np
2+
import pandas as pd
3+
from pandas.testing import assert_series_equal
4+
5+
6+
def get_random_series() -> pd.Series:
7+
dates = pd.date_range("2018-08-17", "2019-01-10", name="date")
8+
securities = pd.Index(np.arange(100), name="security_id")
9+
index = pd.MultiIndex.from_product([dates, securities])
10+
np.random.seed(42)
11+
random_series = pd.Series(np.random.randn(len(index)), index=index, name="stuff")
12+
return random_series
13+
14+
15+
def test_batch_roundtrip(s3_version_store_v1):
16+
df = get_random_series()
17+
symbols = [f"symbol_{i}" for i in range(500)]
18+
data_vector = [df for _ in symbols]
19+
s3_version_store_v1.batch_write(symbols, data_vector)
20+
s3_version_store_v1.batch_read(symbols)
21+

0 commit comments

Comments
 (0)