Skip to content

Commit 0870a4f

Browse files
committed
More work
1 parent e6198f6 commit 0870a4f

32 files changed

+1616
-89
lines changed

cpp/arcticdb/CMakeLists.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ set(arcticdb_srcs
511511
version/symbol_list.cpp
512512
version/version_map_batch_methods.cpp
513513
storage/s3/ec2_utils.cpp
514-
util/buffer_holder.cpp storage/async_storage.hpp)
514+
util/buffer_holder.cpp storage/async_storage.hpp util/allocation_tracing.hpp util/allocation_tracing.cpp storage/s3/http_client.cpp storage/s3/http_client.hpp storage/s3/http_client_factory.hpp storage/s3/http_client_factory.cpp storage/s3/http_request.hpp storage/s3/http_request.cpp storage/s3/http_response.hpp storage/s3/http_response.cpp)
515515

516516
add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})
517517

@@ -662,6 +662,7 @@ set (arcticdb_core_libraries
662662
${Zstd_LIBRARY}
663663
${ARCTICDB_MONGO_LIBS}
664664
spdlog::spdlog
665+
cpptrace::cpptrace
665666
Folly::folly # Transitively includes: double-conversion, gflags, glog, libevent, libssl, libcrypto, libiberty, libsodium
666667
Azure::azure-identity
667668
Azure::azure-storage-blobs

cpp/arcticdb/async/async_store.hpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ folly::Future<std::vector<VariantKey>> batch_read_compressed(
332332
std::vector<folly::Future<pipelines::SegmentAndSlice>> batch_read_uncompressed(
333333
std::vector<pipelines::RangesAndKey>&& ranges_and_keys,
334334
std::shared_ptr<std::unordered_set<std::string>> columns_to_decode) override {
335+
log::version().debug("Reading {} keys", ranges_and_keys.size());
335336
return folly::window(
336337
std::move(ranges_and_keys),
337338
[this, columns_to_decode](auto&& ranges_and_key) {
@@ -341,7 +342,7 @@ std::vector<folly::Future<pipelines::SegmentAndSlice>> batch_read_uncompressed(
341342
library_,
342343
storage::ReadKeyOpts{},
343344
DecodeSliceTask{std::forward<decltype(ranges_and_key)>(ranges_and_key), columns_to_decode});
344-
}, async::TaskScheduler::instance()->io_thread_count() * 2);
345+
}, async::TaskScheduler::instance()->io_thread_count() * 4);
345346
}
346347

347348
std::vector<folly::Future<bool>> batch_key_exists(

cpp/arcticdb/async/tasks.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ struct DecodeSliceTask : BaseTask {
429429

430430
pipelines::SegmentAndSlice operator()(storage::KeySegmentPair&& key_segment_pair) {
431431
ARCTICDB_SAMPLE(DecodeSliceTask, 0)
432+
//log::version().info("Decode into slice {}", key_segment_pair.variant_key());
432433
return decode_into_slice(std::move(key_segment_pair));
433434
}
434435

cpp/arcticdb/entity/performance_tracing.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
#include <memory>
1515

16-
#define ARCTICDB_LOG_PERFORMANCE
16+
//#define ARCTICDB_LOG_PERFORMANCE
1717

1818
#define ARCTICDB_RUNTIME_SAMPLE(name, flags) \
1919
static bool _scoped_timer_active_ = ConfigsMap::instance()->get_int("Logging.timings", 0) == 1 || ConfigsMap::instance()->get_int("Logging.ALL", 0) == 1; \

cpp/arcticdb/log/log.hpp

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
#include <spdlog/spdlog.h>
1313

14-
#define DEBUG_BUILD
1514
#ifdef DEBUG_BUILD
1615
#define ARCTICDB_DEBUG(logger, ...) logger.debug(__VA_ARGS__)
1716
#define ARCTICDB_TRACE(logger, ...) logger.trace(__VA_ARGS__)

cpp/arcticdb/processing/clause.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ std::vector<EntityId> FilterClause::process(std::vector<EntityId>&& entity_ids)
106106
}
107107
auto proc = gather_entities<std::shared_ptr<SegmentInMemory>, std::shared_ptr<RowRange>, std::shared_ptr<ColRange>>(*component_manager_, std::move(entity_ids));
108108
proc.set_expression_context(expression_context_);
109+
//log::version().info("Doing filter {} for entity ids {}", expression_context_->root_node_name_, entity_ids);
109110
auto variant_data = proc.get(expression_context_->root_node_name_);
110111
std::vector<EntityId> output;
111112
util::variant_match(variant_data,
@@ -468,6 +469,7 @@ void ResampleClause<closed_boundary>::set_processing_config(const ProcessingConf
468469
template<ResampleBoundary closed_boundary>
469470
std::vector<std::vector<size_t>> ResampleClause<closed_boundary>::structure_for_processing(
470471
std::vector<RangesAndKey>& ranges_and_keys) {
472+
//log::version().info("ResampleClause: structure for processing 1");
471473
if (ranges_and_keys.empty()) {
472474
return {};
473475
}
@@ -503,6 +505,7 @@ std::vector<std::vector<EntityId>> ResampleClause<closed_boundary>::structure_fo
503505
if (entity_ids.empty()) {
504506
return {};
505507
}
508+
//log::version().info("ResampleClause: structure for processing 2");
506509
auto [segments, row_ranges, col_ranges] = component_manager_->get_entities<std::shared_ptr<SegmentInMemory>, std::shared_ptr<RowRange>, std::shared_ptr<ColRange>>(entity_ids, false);
507510
std::vector<RangesAndEntity> ranges_and_entities;
508511
ranges_and_entities.reserve(entity_ids.size());
@@ -559,6 +562,7 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
559562
return {};
560563
}
561564
auto proc = gather_entities<std::shared_ptr<SegmentInMemory>, std::shared_ptr<RowRange>, std::shared_ptr<ColRange>, EntityFetchCount>(*component_manager_, std::move(entity_ids));
565+
//log::version().info("ResampleClause: processing entities {}", entity_ids);
562566
auto row_slices = split_by_row_slice(std::move(proc));
563567
// If the entity fetch counts for the entities in the first row slice are 2, the first bucket overlapping this row
564568
// slice is being computed by the call to process dealing with the row slices above these. Otherwise, this call

cpp/arcticdb/processing/component_manager.hpp

+15-1
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,18 @@ class ComponentManager {
143143
std::shared_mutex mtx_;
144144
};
145145

146-
} // namespace arcticdb
146+
} // namespace arcticdb
147+
148+
namespace fmt {
149+
template<>
150+
struct formatter<arcticdb::EntityId> {
151+
template<typename ParseContext>
152+
constexpr auto parse(ParseContext& ctx) { return ctx.begin(); }
153+
154+
template<typename FormatContext>
155+
auto format(const arcticdb::EntityId& id, FormatContext& ctx) const {
156+
return fmt::format_to(ctx.out(), "{}", static_cast<uint64_t>(id));
157+
}
158+
};
159+
160+
} //namespace fmt

cpp/arcticdb/storage/azure/azure_storage.cpp

+10-6
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,17 @@ KeySegmentPair do_read_impl(
199199
static_cast<int>(e.StatusCode),
200200
e.ReasonPhrase);
201201
}
202-
throw KeyNotFoundException(variant_key,
203-
fmt::format("Failed to read azure segment with key '{}' {} {}: {}",
204-
variant_key,
205-
blob_name,
206-
static_cast<int>(e.StatusCode),
207-
e.ReasonPhrase));
202+
throw KeyNotFoundException(
203+
variant_key,
204+
fmt::format("Failed to read azure segment with key '{}' {} {}: {}",
205+
variant_key,
206+
blob_name,
207+
static_cast<int>(e.StatusCode),
208+
e.ReasonPhrase));
209+
} catch(const std::exception& ex) {
210+
throw KeyNotFoundException(variant_key);
208211
}
212+
return KeySegmentPair{};
209213
}
210214

211215
namespace fg = folly::gen;

cpp/arcticdb/storage/library_manager.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,13 @@ std::vector<LibraryPath> LibraryManager::get_library_paths() const {
250250
}
251251

252252
bool LibraryManager::has_library(const LibraryPath& path) const {
253+
{
254+
std::lock_guard<std::mutex> lock{open_libraries_mutex_};
255+
if (auto cached = open_libraries_.get(path); cached) {
256+
return true;
257+
}
258+
}
259+
253260
return store_->key_exists_sync(RefKey{StreamId(path.to_delim_path()), entity::KeyType::LIBRARY_CONFIG});
254261
}
255262

cpp/arcticdb/storage/library_manager.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ namespace arcticdb::storage {
7070

7171
std::shared_ptr<Store> store_;
7272
LRUCache<LibraryPath, std::shared_ptr<Library>> open_libraries_;
73-
std::mutex open_libraries_mutex_; // for open_libraries_
73+
mutable std::mutex open_libraries_mutex_; // for open_libraries_
7474
};
7575
}
7676

cpp/arcticdb/storage/s3/detail-inl.hpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,10 @@ KeySegmentPair do_read_impl(
145145
KeyBucketizer&& bucketizer,
146146
ReadKeyOpts opts) {
147147
ARCTICDB_SAMPLE(S3StorageRead, 0)
148+
//log::version().info("Pre-bucketize {}", variant_key);
148149
auto key_type_dir = key_type_folder(root_folder, variant_key_type(variant_key));
149150
auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, variant_key), variant_key);
150-
151+
//log::version().info("Looking for key {}", variant_key);
151152
auto get_object_result = s3_client.get_object(s3_object_name, bucket_name);
152153

153154
if (get_object_result.is_success()) {
@@ -167,6 +168,7 @@ KeySegmentPair do_read_impl(
167168

168169
throw KeyNotFoundException(variant_key);
169170
}
171+
return KeySegmentPair{};
170172
}
171173

172174
/*template <typename KeyBucketizer>
@@ -193,7 +195,7 @@ folly::Future<KeySegmentPair> do_async_read_impl(
193195
ReadKeyOpts) {
194196
auto key_type_dir = key_type_folder(root_folder, variant_key_type(variant_key));
195197
auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, variant_key), variant_key);
196-
return s3_client.get_object_async(bucket_name, s3_object_name).thenValue([vk=std::move(variant_key)] (auto&& result) mutable -> KeySegmentPair {
198+
return s3_client.get_object_async(s3_object_name, bucket_name).thenValue([vk=std::move(variant_key)] (auto&& result) mutable -> KeySegmentPair {
197199
if(result.is_success())
198200
return KeySegmentPair(std::move(vk), std::move(result.get_output()));
199201
else

0 commit comments

Comments
 (0)