Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions components/core/src/clp_s/ffi/sfa/ClpArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <exception>
#include <memory>
#include <new>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
Expand All @@ -12,6 +13,7 @@
#include <ystdlib/error_handling/Result.hpp>

#include <clp/BufferReader.hpp>
#include <clp_s/archive_constants.hpp>
#include <clp_s/ArchiveReader.hpp>
#include <clp_s/ffi/sfa/SfaErrorCode.hpp>
#include <clp_s/InputConfig.hpp>
Expand All @@ -27,7 +29,9 @@ auto ClpArchiveReader::create(std::string_view archive_path) -> Result<ClpArchiv
auto path{get_path_object_for_raw_path(archive_path)};
reader = std::make_unique<clp_s::ArchiveReader>();
reader->open(path, NetworkAuthOption{});
return ClpArchiveReader{std::move(reader), nullptr};
auto clp_archive_reader{ClpArchiveReader{std::move(reader), nullptr}};
YSTDLIB_ERROR_HANDLING_TRYV(clp_archive_reader.precompute_archive_metadata());
return clp_archive_reader;
} catch (std::bad_alloc const&) {
SPDLOG_ERROR(
"Failed to create ClpArchiveReader for archive {}: out of memory.",
Expand Down Expand Up @@ -57,7 +61,11 @@ auto ClpArchiveReader::create(std::vector<char>&& archive_data) -> Result<ClpArc

archive_reader = std::make_unique<clp_s::ArchiveReader>();
archive_reader->open(reader, cDefaultArchiveId);
return ClpArchiveReader{std::move(archive_reader), std::move(archive_data_owner)};
auto clp_archive_reader{
ClpArchiveReader{std::move(archive_reader), std::move(archive_data_owner)}
};
YSTDLIB_ERROR_HANDLING_TRYV(clp_archive_reader.precompute_archive_metadata());
return clp_archive_reader;
} catch (std::bad_alloc const&) {
SPDLOG_ERROR("Failed to create ClpArchiveReader: out of memory.");
return SfaErrorCode{SfaErrorCodeEnum::NoMemory};
Expand All @@ -72,9 +80,7 @@ ClpArchiveReader::ClpArchiveReader(
std::shared_ptr<std::vector<char>> archive_data
)
: m_archive_reader{std::move(reader)},
m_archive_data{std::move(archive_data)} {
precompute_archive_metadata();
}
m_archive_data{std::move(archive_data)} {}

ClpArchiveReader::ClpArchiveReader(ClpArchiveReader&& rhs) noexcept {
move_from(rhs);
Expand Down Expand Up @@ -107,19 +113,46 @@ auto ClpArchiveReader::close() noexcept -> void {
m_archive_reader.reset();
}
m_event_count = 0;
m_file_names.clear();
m_file_infos.clear();
}

auto ClpArchiveReader::move_from(ClpArchiveReader& rhs) noexcept -> void {
m_archive_reader = std::move(rhs.m_archive_reader);
m_archive_data = std::move(rhs.m_archive_data);
m_event_count = std::exchange(rhs.m_event_count, 0);
m_file_names = std::move(rhs.m_file_names);
m_file_infos = std::move(rhs.m_file_infos);
}

auto ClpArchiveReader::precompute_archive_metadata() -> void {
auto ClpArchiveReader::precompute_archive_metadata() -> Result<void> {
auto const& range_index{m_archive_reader->get_range_index()};
m_file_names.reserve(range_index.size());
m_file_infos.reserve(range_index.size());

uint64_t prev_end_idx{0};
for (auto const& range : range_index) {
m_event_count += static_cast<uint64_t>(range.end_index - range.start_index);
auto const start_idx{static_cast<uint64_t>(range.start_index)};
auto const end_idx{static_cast<uint64_t>(range.end_index)};
if (start_idx >= end_idx || start_idx != prev_end_idx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to confirm with @gibber9809
It appears that

  1. start_idx == end_idx is also valid in our specifications,
  2. the archive reader already catches start_idx > end_idx. This check is redundant

Also, @gibber9809 might also want to verify if other MalformedRangeIndex checks duplicate with existing checks in clp-s's archive reader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is valid for start_idx == end_idx for empty files. And yeah we do validation for these ranges in the archive reader, so I think you shouldn't have to do any extra validation here -- generally we try to offer the contract that if the ArchiveReader lets you read part of an archive it is valid.

If you think that ArchiveReaderAdaptor should do some extra validation that it isn't currently doing feel free to add it and I can review.

return SfaErrorCode{SfaErrorCodeEnum::MalformedRangeIndex};
}
m_event_count += end_idx - start_idx;

auto const filename_it{
range.fields.find(std::string{clp_s::constants::range_index::cFilename})
};
if (range.fields.end() == filename_it || false == filename_it->is_string()) {
return SfaErrorCode{SfaErrorCodeEnum::MalformedRangeIndex};
}
Comment on lines +142 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that every rangeindexentry always has a filename?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to ask Devin offline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the range index exists for the archive _filename, _file_split_number, and _archive_creator_id are guaranteed to exist, per docs (that hopefully we can re-prioritize merging) in #961.

Granted, the range index reading code currently doesn't validate that these fields exist -- might be worth adding extra validation there to flag corruption so that this code can be simpler.

auto const filename{filename_it->get<std::string>()};

m_file_names.push_back(filename);
m_file_infos.emplace_back(filename, start_idx, end_idx);

prev_end_idx = end_idx;
}

return ystdlib::error_handling::success();
}
} // namespace clp_s::ffi::sfa
55 changes: 53 additions & 2 deletions components/core/src/clp_s/ffi/sfa/ClpArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>
#include <vector>

Expand All @@ -14,6 +15,33 @@ class ArchiveReader;
} // namespace clp_s

namespace clp_s::ffi::sfa {
/**
* Metadata describing a single source file's event-index range within a single-file archive.
*/
class FileInfo {
public:
// Constructor
FileInfo(std::string_view file_name, uint64_t start_index, uint64_t end_index)
: m_file_name{file_name},
m_start_index{start_index},
m_end_index{end_index} {}

// Methods
[[nodiscard]] auto get_file_name() const -> std::string const& { return m_file_name; }

[[nodiscard]] auto get_start_index() const -> uint64_t { return m_start_index; }

[[nodiscard]] auto get_end_index() const -> uint64_t { return m_end_index; }

[[nodiscard]] auto get_event_count() const -> uint64_t { return m_end_index - m_start_index; }

private:
// Members
std::string m_file_name;
uint64_t m_start_index{0};
uint64_t m_end_index{0};
};

/**
* A thin wrapper around `clp_s::ArchiveReader` for single file archive FFI entrypoints.
*/
Expand All @@ -27,6 +55,8 @@ class ClpArchiveReader {
* @return A result containing the newly constructed `ClpArchiveReader` on success, or an
* error code indicating the failure:
* - `SfaErrorCodeEnum::IoFailure` if archive open/initialization fails.
* - `SfaErrorCodeEnum::NoMemory` if archive initialization fails due to OOM issues.
* - Forwards `ClpArchiveReader::precompute_archive_metadata`'s return values on failure.
*/
[[nodiscard]] static auto create(std::string_view archive_path)
-> ystdlib::error_handling::Result<ClpArchiveReader>;
Expand All @@ -39,6 +69,7 @@ class ClpArchiveReader {
* error code indicating the failure:
* - `SfaErrorCodeEnum::IoFailure` if archive open/initialization fails.
* - `SfaErrorCodeEnum::NoMemory` if allocating/copying archive bytes fails.
* - Forwards `ClpArchiveReader::precompute_archive_metadata`'s return values on failure.
*/
[[nodiscard]] static auto create(std::vector<char>&& archive_data)
-> ystdlib::error_handling::Result<ClpArchiveReader>;
Expand All @@ -58,6 +89,18 @@ class ClpArchiveReader {
*/
[[nodiscard]] auto get_event_count() const -> uint64_t { return m_event_count; }

/**
* @return Source file names in range-index order.
*/
[[nodiscard]] auto get_file_names() const -> std::vector<std::string> { return m_file_names; }

/**
* @return Source file metadata in range index order.
*/
[[nodiscard]] auto get_file_infos() const -> std::vector<FileInfo> const& {
return m_file_infos;
}

private:
// Constructors
explicit ClpArchiveReader(
Expand All @@ -79,14 +122,22 @@ class ClpArchiveReader {
auto move_from(ClpArchiveReader& rhs) noexcept -> void;

/**
* Precomputes metadata from the archive range index.
* Precomputes archive metadata from the range index.
*
* Assumes range-index entries are ordered and globally contiguous in log-event index space,
* i.e., each entry starts at the previous entry's end.
*
* @return A void result on success, or an error code indicating the failure:
* - `SfaErrorCodeEnum::MalformedRangeIndex` if range-index metadata violates the assumption.
*/
auto precompute_archive_metadata() -> void;
[[nodiscard]] auto precompute_archive_metadata() -> ystdlib::error_handling::Result<void>;

// Members
std::unique_ptr<clp_s::ArchiveReader> m_archive_reader;
std::shared_ptr<std::vector<char>> m_archive_data;
uint64_t m_event_count{0};
std::vector<std::string> m_file_names;
std::vector<FileInfo> m_file_infos;
};
} // namespace clp_s::ffi::sfa

Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/ffi/sfa/SfaErrorCode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ auto SfaErrorCategory::message(SfaErrorCodeEnum error_enum) const -> std::string
switch (error_enum) {
case SfaErrorCodeEnum::IoFailure:
return "an I/O operation failed";
case SfaErrorCodeEnum::MalformedRangeIndex:
return "the archive range index is malformed";
case SfaErrorCodeEnum::NoMemory:
return "insufficient memory";
case SfaErrorCodeEnum::NotInit:
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/ffi/sfa/SfaErrorCode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace clp_s::ffi::sfa {
*/
enum class SfaErrorCodeEnum : uint8_t {
IoFailure,
MalformedRangeIndex,
NoMemory,
NotInit,
};
Expand Down
93 changes: 55 additions & 38 deletions components/core/tests/test-clp_s-ffi_sfa_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@
#include <vector>

#include <catch2/catch_test_macros.hpp>
#include <catch2/generators/catch_generators.hpp>
#include <ystdlib/error_handling/Result.hpp>

#include "../src/clp/ReadOnlyMemoryMappedFile.hpp"
#include "../src/clp_s/ffi/sfa/ClpArchiveReader.hpp"
#include "clp_s_test_utils.hpp"
#include "TestOutputCleaner.hpp"

namespace {
using clp::ReadOnlyMemoryMappedFile;
using clp_s::ffi::sfa::ClpArchiveReader;
using ystdlib::error_handling::Result;
using ystdlib::error_handling::success;

constexpr std::string_view cSfaReaderLogsDirectory{"test_log_files"};
constexpr std::string_view cSfaReaderArchiveOutputDirectory{"test-clp_s-ffi_sfa-reader-archive"};
constexpr std::string_view cInputNoFloats{"test_no_floats_sorted.jsonl"};
Expand All @@ -38,7 +45,6 @@ auto generate_single_file_archive(std::filesystem::path const& log_path) -> std:
std::filesystem::create_directories(root_output_dir);

auto const output_dir{root_output_dir / log_path.stem().string()};
std::filesystem::remove_all(output_dir);

auto const archive_stats = compress_archive(
log_path.string(),
Expand All @@ -64,61 +70,72 @@ auto get_num_lines(std::filesystem::path const& path) -> uint64_t {
return num_lines;
}

auto assert_archive_event_count_matches_log(
std::filesystem::path const& archive_path,
auto assert_reader_matches_expected(
ClpArchiveReader const& reader,
std::string const& expected_file_name,
uint64_t expected_event_count
) -> void {
REQUIRE(std::filesystem::exists(archive_path));

auto reader_result{clp_s::ffi::sfa::ClpArchiveReader::create(archive_path.string())};
REQUIRE(false == reader_result.has_error());
auto& reader = reader_result.value();

auto const event_count{reader.get_event_count()};
REQUIRE(event_count == expected_event_count);

auto const file_names{reader.get_file_names()};
auto const& file_infos{reader.get_file_infos()};
REQUIRE(1 == file_names.size());
REQUIRE(1 == file_infos.size());

auto const& file_name{file_names.front()};
auto const& file_info{file_infos.front()};
REQUIRE(expected_file_name == file_name);
REQUIRE(expected_file_name == file_info.get_file_name());
REQUIRE(0 == file_info.get_start_index());
REQUIRE(expected_event_count == file_info.get_end_index());
REQUIRE(expected_event_count == file_info.get_event_count());
}

auto assert_archive_event_count_matches_log_in_memory(
std::filesystem::path const& archive_path,
uint64_t expected_event_count
) -> void {
REQUIRE(std::filesystem::exists(archive_path));
auto create_reader_from_path(std::filesystem::path const& archive_path)
-> Result<ClpArchiveReader> {
return ClpArchiveReader::create(archive_path.string());
}

auto mmap_result{clp::ReadOnlyMemoryMappedFile::create(archive_path.string())};
REQUIRE(false == mmap_result.has_error());
auto& mapped_archive = mmap_result.value();
auto create_reader_from_bytes(std::filesystem::path const& archive_path)
-> Result<ClpArchiveReader> {
auto const mapped_archive{
YSTDLIB_ERROR_HANDLING_TRYX(ReadOnlyMemoryMappedFile::create(archive_path.string()))
};
auto const view{mapped_archive.get_view()};
REQUIRE(false == view.empty());
return ClpArchiveReader::create(std::vector<char>{view.begin(), view.end()});
}

auto reader_result{
clp_s::ffi::sfa::ClpArchiveReader::create(std::vector<char>{view.begin(), view.end()})
};
REQUIRE(false == reader_result.has_error());
auto& reader = reader_result.value();
auto run_single_log_file_test(
std::filesystem::path const& archive_path,
std::string const& expected_file_name,
uint64_t expected_event_count
) -> Result<void> {
auto const r_path{YSTDLIB_ERROR_HANDLING_TRYX(create_reader_from_path(archive_path))};
assert_reader_matches_expected(r_path, expected_file_name, expected_event_count);

auto const event_count{reader.get_event_count()};
REQUIRE(event_count == expected_event_count);
auto const r_bytes{YSTDLIB_ERROR_HANDLING_TRYX(create_reader_from_bytes(archive_path))};
assert_reader_matches_expected(r_bytes, expected_file_name, expected_event_count);

return success();
}
} // namespace

TEST_CASE("clp_s_ffi_sfa_reader_matches_test_no_floats_sorted", "[clp-s][ffi][sfa]") {
TEST_CASE("clp_s_ffi_sfa_reader", "[clp-s][ffi][sfa]") {
TestOutputCleaner const test_cleanup{{get_archive_output_root_dir().string()}};
auto const log_path{get_log_local_path(cInputNoFloats)};
REQUIRE(std::filesystem::exists(log_path));
auto const archive_path{generate_single_file_archive(log_path)};
auto const expected_event_count{get_num_lines(log_path)};

assert_archive_event_count_matches_log(archive_path, expected_event_count);
assert_archive_event_count_matches_log_in_memory(archive_path, expected_event_count);
}
auto const log_file_name{GENERATE(cInputNoFloats, cInputFloatTimestamp)};

TEST_CASE("clp_s_ffi_sfa_reader_matches_test_search_float_timestamp", "[clp-s][ffi][sfa]") {
TestOutputCleaner const test_cleanup{{get_archive_output_root_dir().string()}};
auto const log_path{get_log_local_path(cInputFloatTimestamp)};
auto const log_path{get_log_local_path(log_file_name)};
REQUIRE(std::filesystem::exists(log_path));

auto const archive_path{generate_single_file_archive(log_path)};
auto const expected_event_count{get_num_lines(log_path)};
REQUIRE(std::filesystem::exists(archive_path));

assert_archive_event_count_matches_log(archive_path, expected_event_count);
assert_archive_event_count_matches_log_in_memory(archive_path, expected_event_count);
auto const expected_event_count{get_num_lines(log_path)};
auto const test_result{
run_single_log_file_test(archive_path, log_path.string(), expected_event_count)
};
REQUIRE(false == test_result.has_error());
}
Loading