Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
- [Configuring Repositories](#configuring-repositories)
- [Configuring Volatile Repositories](#configuring-volatile-repositories)
- [Configuring Repository storage locations](#configuring-repository-storage-locations)
- [Configuring cache size for rocksdb content repository](#configuring-cache-size-for-rocksdb-content-repository)
- [Configuring compression for rocksdb database](#configuring-compression-for-rocksdb-database)
- [Configuring compaction for rocksdb database](#configuring-compaction-for-rocksdb-database)
- [Configuring synchronous or asynchronous writes for RocksDB content repository](#configuring-synchronous-or-asynchronous-writes-for-rocksdb-content-repository)
Expand Down Expand Up @@ -674,6 +675,19 @@ In a Filesystem Hierarchy Standard (FHS) installation (from an RPM package), the
nifi.flowfile.repository.directory.default=/var/lib/nifi-minifi-cpp/flowfile_repository
nifi.database.content.repository.directory.default=/var/lib/nifi-minifi-cpp/content_repository

### Configuring cache size for rocksdb content repository

The RocksDB content repository uses a cache to limit memory usage. The cache size can be configured using the following property.
This should limit the memory usage but may cause minimal processing overhead.

# in minifi.properties
nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB

You can disable this cache by setting it to an empty value.

# in minifi.properties
nifi.database.content.repository.optimize.for.small.db.cache.size=


### Configuring compression for rocksdb database

Expand Down
1 change: 1 addition & 0 deletions conf/minifi.properties.in
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ nifi.content.repository.class.name=DatabaseContentRepository
## Relates to the internal workings of the rocksdb backend
# nifi.flowfile.repository.rocksdb.compaction.period=2 min
# nifi.database.content.repository.rocksdb.compaction.period=2 min
# nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB

# setting this value to "0" enables synchronous deletion
# nifi.database.content.repository.purge.period = 1 sec
Expand Down
36 changes: 33 additions & 3 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,51 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu

setCompactionPeriod(configuration);

auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
const auto cache_size = configuration->get(Configure::nifi_dbcontent_optimize_for_small_db_cache_size).or_else([] { return std::make_optional<std::string>("8 MB"); })
| utils::andThen([](const auto& cache_size_str) -> std::optional<uint64_t> {
return parsing::parseDataSize(cache_size_str) | utils::toOptional();
});

std::shared_ptr<rocksdb::Cache> cache = nullptr;
std::shared_ptr<rocksdb::WriteBufferManager> wbm = nullptr;
if (cache_size) {
cache = rocksdb::NewLRUCache(*cache_size);
wbm = std::make_shared<rocksdb::WriteBufferManager>(0, cache);
logger_->log_trace("Using {} sized cache for DatabaseContentRepository", *cache_size);
} else {
logger_->log_trace("Cache limitation disabled for DatabaseContentRepository");
}

auto set_db_opts = [encrypted_env, &cache, &wbm] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
minifi::internal::setCommonRocksDbOptions(db_opts);
if (encrypted_env) {
db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
} else {
db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
}
db_opts.optimizeForSmallDb(cache, wbm);
};
auto set_cf_opts = [&configuration] (rocksdb::ColumnFamilyOptions& cf_opts) {
auto set_cf_opts = [&configuration, &cache] (rocksdb::ColumnFamilyOptions& cf_opts) {
cf_opts.OptimizeForPointLookup(4);
cf_opts.merge_operator = std::make_shared<StringAppender>();
cf_opts.max_successive_merges = 0;
if (auto compression_type = minifi::internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) {
cf_opts.max_write_buffer_number = 2;
cf_opts.write_buffer_size = 4_MB;
if (const auto compression_type = internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) {
cf_opts.compression = *compression_type;
}
if (cache) {
rocksdb::BlockBasedTableOptions table_options;
table_options.block_cache = cache;

table_options.cache_index_and_filter_blocks = true;
table_options.cache_index_and_filter_blocks_with_high_priority = true;

table_options.pin_l0_filter_and_index_blocks_in_cache = false;
table_options.pin_top_level_index_and_filter = false;

cf_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
}
};
db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_,
minifi::internal::getRocksDbOptionsToOverride(configuration, Configure::nifi_content_repository_rocksdb_options));
Expand Down
26 changes: 13 additions & 13 deletions extensions/rocksdb-repos/database/OpenRocksDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,23 @@ std::optional<uint64_t> OpenRocksDb::getApproximateSizes() const {
return std::nullopt;
}

minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
minifi::core::RepositoryMetricsSource::RocksDbStats stats;
std::string table_readers;
GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
void OpenRocksDb::fillU64FromProperty(uint64_t& member, std::string_view property_name) {
std::string property_value;
GetProperty(property_name, &property_value);
try {
stats.table_readers_size = std::stoull(table_readers);
member = std::stoull(property_value);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!");
logger_->log_warn("Could not retrieve valid '{}' property value from rocksdb content repository!", property_name);
}
}

std::string all_memtables;
GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
try {
stats.all_memory_tables_size = std::stoull(all_memtables);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!");
}

minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
minifi::core::RepositoryMetricsSource::RocksDbStats stats;
fillU64FromProperty(stats.table_readers_size, "rocksdb.estimate-table-readers-mem");
fillU64FromProperty(stats.all_memory_tables_size, "rocksdb.cur-size-all-mem-tables");
fillU64FromProperty(stats.block_cache_usage, "rocksdb.block-cache-usage");
fillU64FromProperty(stats.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage");

return stats;
}
Expand Down
2 changes: 2 additions & 0 deletions extensions/rocksdb-repos/database/OpenRocksDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class OpenRocksDb {
void handleResult(const rocksdb::Status& result);
void handleResult(const std::vector<rocksdb::Status>& results);

void fillU64FromProperty(uint64_t& member, std::string_view property_name);

gsl::not_null<RocksDbInstance*> db_;
gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_;
gsl::not_null<std::shared_ptr<ColumnHandle>> column_;
Expand Down
12 changes: 12 additions & 0 deletions extensions/rocksdb-repos/database/RocksDbUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ class Writable {
}
}

template <typename Method, typename... Args>
decltype(auto) call(Method method, Args&&... args) {
return std::invoke(method, target_, std::forward<Args>(args)...);
}

void optimizeForSmallDb(std::shared_ptr<rocksdb::Cache> cache, std::shared_ptr<rocksdb::WriteBufferManager> wbm) {
if (!cache || !wbm) { return; }
target_.OptimizeForSmallDb(&cache);
target_.write_buffer_manager = wbm;
target_.max_open_files = 20;
}

template<typename F>
const F& get(F T::* member) {
return target_.*member;
Expand Down
32 changes: 32 additions & 0 deletions extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,35 @@ TEST_CASE("DBContentRepository can clear orphan entries") {

REQUIRE(getDbSize(dir) == 0);
}

TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size default") {
LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();

const auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
const auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
REQUIRE(content_repo->initialize(configuration));

CHECK(LogTestController::getInstance().contains("Using 8388608 sized cache for DatabaseContentRepository"));
}

TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size override") {
LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();

const auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", "100 MB");
const auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
REQUIRE(content_repo->initialize(configuration));

CHECK(LogTestController::getInstance().contains("Using 104857600 sized cache for DatabaseContentRepository"));
}

TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size disable") {
LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();

const auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", "");
const auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
REQUIRE(content_repo->initialize(configuration));

CHECK(LogTestController::getInstance().contains("Cache limitation disabled for DatabaseContentRepository"));
}
1 change: 1 addition & 0 deletions libminifi/src/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal
{Configuration::nifi_flowfile_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
{Configuration::nifi_provenance_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
{Configuration::nifi_rocksdb_state_storage_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
{Configuration::nifi_dbcontent_optimize_for_small_db_cache_size, gsl::make_not_null(&core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)},
{Configuration::nifi_dbcontent_repository_purge_period, gsl::make_not_null(&core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)},
{Configuration::nifi_remote_input_secure, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ std::vector<SerializedResponseNode> RepositoryMetricsSourceStore::serialize() co
if (auto rocksdb_stats = repo->getRocksDbStats()) {
parent.children.push_back({.name = "rocksDbTableReadersSize", .value = rocksdb_stats->table_readers_size});
parent.children.push_back({.name = "rocksDbAllMemoryTablesSize", .value = rocksdb_stats->all_memory_tables_size});
parent.children.push_back({.name = "rocksDbBlockCacheUsage", .value = rocksdb_stats->block_cache_usage});
parent.children.push_back({.name = "rocksDbBlockCachePinnedUsage", .value = rocksdb_stats->block_cache_pinned_usage});
}

serialized.push_back(parent);
Expand All @@ -68,6 +70,10 @@ std::vector<PublishedMetric> RepositoryMetricsSourceStore::calculateMetrics() co
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast<double>(rocksdb_stats->all_memory_tables_size),
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
metrics.push_back({"rocksdb_block_cache_usage_bytes", static_cast<double>(rocksdb_stats->block_cache_usage),
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
metrics.push_back({"rocksdb_block_cache_pinned_usage_bytes", static_cast<double>(rocksdb_stats->block_cache_pinned_usage),
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
}
}
return metrics;
Expand Down
4 changes: 3 additions & 1 deletion libminifi/test/libtest/unit/ProvenanceTestHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ class TestRocksDbRepository : public TestThreadedRepository {
std::optional<RocksDbStats> getRocksDbStats() const override {
return RocksDbStats {
.table_readers_size = 100,
.all_memory_tables_size = 200
.all_memory_tables_size = 200,
.block_cache_usage = 85,
.block_cache_pinned_usage = 50
};
}
};
Expand Down
39 changes: 39 additions & 0 deletions libminifi/test/libtest/unit/TestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,43 @@ std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents
return {};
}

std::vector<LogMessageView> extractLogMessageViews(const std::string& log_str) {
std::vector<LogMessageView> messages;
const std::regex header_pattern(R"((\[[\d\-\s\:\.]+\]) (\s*\[[^\]]+\]) \[(.*)\])");
struct HeaderMarker {
size_t start;
std::string_view timestamp;
std::string_view logger_class;
std::string_view log_level;
size_t end;
};

std::vector<HeaderMarker> markers = ranges::subrange<std::sregex_iterator>(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern),
std::sregex_iterator()) |
ranges::views::transform([=](const std::smatch& m) {
return HeaderMarker{.start = static_cast<size_t>(m.position(0)),
.timestamp = std::string_view{log_str.data() + m.position(1), static_cast<size_t>(m.length(1))},
.logger_class = std::string_view{log_str.data() + m.position(2), static_cast<size_t>(m.length(2))},
.log_level = std::string_view{log_str.data() + m.position(3), static_cast<size_t>(m.length(3))},
.end = static_cast<size_t>(m.position(0) + m.length(0))
};
}) | ranges::to<std::vector>();

markers.push_back(HeaderMarker{.start = log_str.size(),
.timestamp = {},
.logger_class = {},
.log_level = {},
.end = log_str.size()
});

for (auto window: markers | ranges::views::sliding(2)) {
messages.push_back(LogMessageView{.timestamp = window[0].timestamp,
.logger_class = window[0].logger_class,
.log_level = window[0].log_level,
.payload = {log_str.data() + window[0].end, window[1].start - window[0].end}});
}

return messages;
}

} // namespace org::apache::nifi::minifi::test::utils
18 changes: 16 additions & 2 deletions libminifi/test/libtest/unit/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@
#include "asio.hpp"
#include "asio/ssl.hpp"
#include "utils/net/Ssl.h"
#include "range/v3/algorithm/any_of.hpp"
#include "core/Processor.h"
#include "core/logging/LoggerFactory.h"
#include <range/v3/all.hpp>
#include "./ProcessorUtils.h"

using namespace std::literals::chrono_literals;
Expand Down Expand Up @@ -127,6 +126,12 @@ bool verifyLogLineVariantPresenceInPollTime(const std::chrono::duration<Rep, Per
return verifyEventHappenedInPollTime(wait_duration, check);
}

template<class Rep, class Period>
bool verifyLogMatchesRegexInPollTime(const std::chrono::duration<Rep, Period>& wait_duration, const std::string& regex) {
auto check = [&regex] { return LogTestController::getInstance().matchesRegex(regex); };
return verifyEventHappenedInPollTime(wait_duration, check);
}

namespace internal {
struct JsonContext {
const JsonContext *parent{nullptr};
Expand Down Expand Up @@ -234,6 +239,15 @@ inline bool runningAsUnixRoot() {
#endif
}

struct LogMessageView {
std::string_view timestamp;
std::string_view logger_class;
std::string_view log_level;
std::string_view payload;
};

std::vector<LogMessageView> extractLogMessageViews(const std::string& log_str);

} // namespace org::apache::nifi::minifi::test::utils

namespace Catch {
Expand Down
Loading
Loading