diff --git a/production/db/core/CMakeLists.txt b/production/db/core/CMakeLists.txt index 49136dd5f2f2..245eabe02e56 100644 --- a/production/db/core/CMakeLists.txt +++ b/production/db/core/CMakeLists.txt @@ -125,9 +125,10 @@ else() endif() add_library(gaia_db_persistence STATIC - src/log_file.cpp src/async_disk_writer.cpp - src/async_write_batch.cpp) + src/async_write_batch.cpp + src/log_file.cpp + src/log_io.cpp) # Add GAIA_DB_SERVER preprocessor definition for conditional includes. target_compile_definitions(gaia_db_persistence PUBLIC GAIA_DB_SERVER=1) configure_gaia_target(gaia_db_persistence) diff --git a/production/db/core/src/async_disk_writer.cpp b/production/db/core/src/async_disk_writer.cpp index 974194bdd484..59982725c150 100644 --- a/production/db/core/src/async_disk_writer.cpp +++ b/production/db/core/src/async_disk_writer.cpp @@ -37,12 +37,12 @@ async_disk_writer_t::async_disk_writer_t(int validate_flush_efd, int signal_chec ASSERT_PRECONDITION(validate_flush_efd >= 0, "Invalid validate flush eventfd"); ASSERT_PRECONDITION(signal_checkpoint_efd >= 0, "Invalid signal checkpoint eventfd"); - m_validate_flush_efd = validate_flush_efd; - m_signal_checkpoint_efd = signal_checkpoint_efd; + m_validate_flush_eventfd = validate_flush_efd; + m_signal_checkpoint_eventfd = signal_checkpoint_efd; // Used to block new writes to disk when a batch is already getting flushed. - s_flush_efd = eventfd(1, 0); - if (s_flush_efd == -1) + s_flush_eventfd = eventfd(1, 0); + if (s_flush_eventfd == -1) { const char* reason = ::explain_eventfd(1, 0); throw_system_error(reason); @@ -62,7 +62,7 @@ async_disk_writer_t::~async_disk_writer_t() void async_disk_writer_t::teardown() { - close_fd(s_flush_efd); + close_fd(s_flush_eventfd); } void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t user_data) @@ -76,12 +76,7 @@ void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t use throw_system_error(ss.str(), err); } -void async_disk_writer_t::map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_fd) -{ - m_ts_to_session_decision_fd_map.insert(std::pair(commit_ts, session_decision_fd)); -} - -void async_disk_writer_t::add_decisions_to_batch(decision_list_t& decisions) +void async_disk_writer_t::add_decisions_to_batch(const decision_list_t& decisions) { for (const auto& decision : decisions) { @@ -106,7 +101,7 @@ void async_disk_writer_t::perform_post_completion_maintenance() // Signal to checkpointing thread the upper bound of files that it can process. if (max_file_seq_to_close > 0) { - signal_eventfd(m_signal_checkpoint_efd, max_file_seq_to_close); + signal_eventfd(m_signal_checkpoint_eventfd, max_file_seq_to_close); } const decision_list_t& decisions = m_in_flight_batch->get_decision_batch_entries(); @@ -116,10 +111,10 @@ void async_disk_writer_t::perform_post_completion_maintenance() transactions::txn_metadata_t::set_txn_durable(decision.commit_ts); // Unblock session thread. - auto itr = m_ts_to_session_decision_fd_map.find(decision.commit_ts); - ASSERT_INVARIANT(itr != m_ts_to_session_decision_fd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts"); + auto itr = m_ts_to_session_decision_eventfd_map.find(decision.commit_ts); + ASSERT_INVARIANT(itr != m_ts_to_session_decision_eventfd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts"); signal_eventfd_single_thread(itr->second); - m_ts_to_session_decision_fd_map.erase(itr); + m_ts_to_session_decision_eventfd_map.erase(itr); } m_in_flight_batch->clear_decision_batch(); @@ -130,7 +125,7 @@ void async_disk_writer_t::submit_and_swap_in_progress_batch(int file_fd, bool sh eventfd_t event_counter; // Block on any pending disk flushes. - eventfd_read(s_flush_efd, &event_counter); + eventfd_read(s_flush_eventfd, &event_counter); // Perform any maintenance on the in_flight batch. perform_post_completion_maintenance(); @@ -147,7 +142,7 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ swap_batches(); // Nothing to submit; reset the flush efd that got burnt in submit_and_swap_in_progress_batch() function. - signal_eventfd_single_thread(s_flush_efd); + signal_eventfd_single_thread(s_flush_eventfd); // Reset metadata buffer. m_metadata_buffer.clear(); @@ -158,8 +153,8 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ m_in_progress_batch->add_fdatasync_op_to_batch(file_fd, get_enum_value(uring_op_t::fdatasync), IOSQE_IO_LINK); // Signal eventfd's as part of batch. - m_in_progress_batch->add_pwritev_op_to_batch(static_cast(&c_default_iov), 1, s_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK); - m_in_progress_batch->add_pwritev_op_to_batch(static_cast(&c_default_iov), 1, m_validate_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN); + m_in_progress_batch->add_pwritev_op_to_batch(static_cast(&c_default_iov), 1, s_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK); + m_in_progress_batch->add_pwritev_op_to_batch(static_cast(&c_default_iov), 1, m_validate_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN); swap_batches(); auto flushed_batch_size = m_in_flight_batch->get_unsubmitted_entries_count(); @@ -187,7 +182,7 @@ void async_disk_writer_t::perform_file_close_operations(int file_fd, file_sequen m_in_progress_batch->append_file_to_batch(file_fd, log_seq); } -unsigned char* async_disk_writer_t::copy_into_metadata_buffer(void* source, size_t size, int file_fd) +unsigned char* async_disk_writer_t::copy_into_metadata_buffer(const void* source, size_t size, int file_fd) { auto current_ptr = m_metadata_buffer.get_current_ptr(); ASSERT_PRECONDITION(current_ptr, "Invalid metadata buffer pointer."); diff --git a/production/db/core/src/chunk_manager.cpp b/production/db/core/src/chunk_manager.cpp index 6b717ee69836..6570e42c1632 100644 --- a/production/db/core/src/chunk_manager.cpp +++ b/production/db/core/src/chunk_manager.cpp @@ -93,21 +93,7 @@ gaia_offset_t chunk_manager_t::allocate( get_state() == chunk_state_t::in_use, "Objects can only be allocated from a chunk in the IN_USE state!"); - ASSERT_PRECONDITION(allocation_size_in_bytes > 0, "Requested allocation size cannot be 0!"); - - // Check before converting to slot units to avoid overflow. - ASSERT_PRECONDITION( - allocation_size_in_bytes <= (c_max_allocation_size_in_slots * c_slot_size_in_bytes), - "Requested allocation size exceeds maximum allocation size of 64KB!"); - - // Calculate allocation size in slot units. -#ifdef DEBUG - // Round up allocation to a page so we can mprotect() it. - size_t allocation_size_in_pages = (allocation_size_in_bytes + c_page_size_in_bytes - 1) / c_page_size_in_bytes; - size_t allocation_size_in_slots = allocation_size_in_pages * (c_page_size_in_bytes / c_slot_size_in_bytes); -#else - size_t allocation_size_in_slots = (allocation_size_in_bytes + c_slot_size_in_bytes - 1) / c_slot_size_in_bytes; -#endif + size_t allocation_size_in_slots = calculate_allocation_size_in_slots(allocation_size_in_bytes); // Ensure that the new allocation doesn't overflow the chunk. if (m_metadata->min_unallocated_slot_offset() + allocation_size_in_slots > c_last_slot_offset) diff --git a/production/db/core/src/log_file.cpp b/production/db/core/src/log_file.cpp index 711f1cb07531..1be55ee7b271 100644 --- a/production/db/core/src/log_file.cpp +++ b/production/db/core/src/log_file.cpp @@ -25,14 +25,9 @@ namespace persistence // TODO (Mihir): Use io_uring for fsync, close & fallocate operations in this file. // open() operation will remain synchronous, since we need the file fd to perform other async // operations on the file. -log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_seq, size_t size) +log_file_t::log_file_t(const std::string& dir_name, int dir_fd, file_sequence_t file_seq, size_t file_size) + : m_dir_name(dir_name), m_dir_fd(dir_fd), m_file_seq(file_seq), m_file_size(file_size) { - m_dir_fd = dir_fd; - m_dir_name = dir; - m_file_seq = file_seq; - m_file_size = size; - m_current_offset = 0; - // open and fallocate depending on size. std::stringstream file_name; file_name << m_dir_name << "/" << m_file_seq; @@ -66,12 +61,12 @@ log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_ } } -size_t log_file_t::get_current_offset() +file_offset_t log_file_t::get_current_offset() const { return m_current_offset; } -int log_file_t::get_file_fd() +int log_file_t::get_file_fd() const { return m_file_fd; } @@ -81,7 +76,12 @@ void log_file_t::allocate(size_t size) m_current_offset += size; } -size_t log_file_t::get_remaining_bytes_count(size_t record_size) +file_sequence_t log_file_t::get_file_sequence() const +{ + return m_file_seq; +} + +size_t log_file_t::get_bytes_remaining_after_append(size_t record_size) const { ASSERT_INVARIANT(m_file_size > 0, "Preallocated file size should be greater than 0."); if (m_file_size < (m_current_offset + record_size)) diff --git a/production/db/core/src/log_io.cpp b/production/db/core/src/log_io.cpp new file mode 100644 index 000000000000..ebeb39974b0b --- /dev/null +++ b/production/db/core/src/log_io.cpp @@ -0,0 +1,363 @@ +///////////////////////////////////////////// +// Copyright (c) Gaia Platform LLC +// All rights reserved. +///////////////////////////////////////////// + +#include "log_io.hpp" + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include "liburing.h" + +#include "gaia_internal/common/assert.hpp" +#include "gaia_internal/common/scope_guard.hpp" +#include "gaia_internal/db/db_types.hpp" +#include + +#include "crc32c.h" +#include "db_helpers.hpp" +#include "db_internal_types.hpp" +#include "db_object_helpers.hpp" +#include "log_file.hpp" +#include "mapped_data.hpp" +#include "memory_helpers.hpp" +#include "memory_types.hpp" +#include "persistence_types.hpp" +#include "txn_metadata.hpp" + +using namespace gaia::common; +using namespace gaia::db::memory_manager; +using namespace gaia::db; + +namespace gaia +{ +namespace db +{ +namespace persistence +{ + +log_handler_t::log_handler_t(const std::string& data_dir_path) +{ + ASSERT_PRECONDITION(!data_dir_path.empty(), "Gaia persistent data directory path shouldn't be empty."); + + std::filesystem::path wal_dir_parent = data_dir_path; + std::filesystem::path wal_dir_child = c_gaia_wal_dir_name; + s_wal_dir_path = wal_dir_parent / wal_dir_child; + + if (-1 == ::mkdir(s_wal_dir_path.c_str(), c_gaia_wal_dir_permissions) && errno != EEXIST) + { + throw_system_error("Unable to create persistent log directory"); + } + + if (-1 == ::open(s_wal_dir_path.c_str(), O_DIRECTORY)) + { + throw_system_error("Unable to open persistent log directory."); + } +} + +void log_handler_t::open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd) +{ + ASSERT_PRECONDITION(validate_flushed_batch_eventfd != -1, "Eventfd to signal post-flush maintenance operations invalid!"); + ASSERT_PRECONDITION(signal_checkpoint_eventfd != -1, "Eventfd to signal checkpointing of log file is invalid!"); + ASSERT_INVARIANT(s_dir_fd != -1, "Unable to open data directory for persistent log writes."); + + // Create new log file every time the log_writer gets initialized. + m_async_disk_writer = std::make_unique(validate_flushed_batch_eventfd, signal_checkpoint_eventfd); + + m_async_disk_writer->open(); +} + +log_handler_t::~log_handler_t() +{ + close_fd(s_dir_fd); +} + +// Currently using the rocksdb impl. +// Todo(Mihir) - Research other crc libs. +uint32_t calculate_crc32(uint32_t init_crc, const void* data, size_t n) +{ + // This implementation uses the CRC32 instruction from the SSE4 (SSE4.2) instruction set if it is available. + // Otherwise, it defaults to a 4 table based lookup implementation. + // Here is an old benchmark that compares various crc implementations including the two used by rocks. + // https://www.strchr.com/crc32_popcnt + return rocksdb::crc32c::Extend(init_crc, static_cast(data), n); +} + +file_offset_t log_handler_t::allocate_log_space(size_t payload_size) +{ + // For simplicity, we don't break up transaction records across log files. Txn updates + // which don't fit in the current file are written to the next one. + // If a transaction is larger than the log file size, then the entire txn is written to the next log file. + // Another simplification is that an async_write_batch contains only writes belonging to a single log file. + if (!m_current_file) + { + auto file_size = (payload_size > c_max_log_file_size_in_bytes) ? payload_size : c_max_log_file_size_in_bytes; + m_current_file.reset(); + m_current_file = std::make_unique(s_wal_dir_path, s_dir_fd, file_sequence_t(s_file_num), file_size); + } + else if (m_current_file->get_bytes_remaining_after_append(payload_size) <= 0) + { + m_async_disk_writer->perform_file_close_operations( + m_current_file->get_file_fd(), m_current_file->get_file_sequence()); + + // One batch writes to a single log file at a time. + m_async_disk_writer->submit_and_swap_in_progress_batch(m_current_file->get_file_fd()); + + m_current_file.reset(); + + // Open new file. + s_file_num++; + auto file_size = (payload_size > c_max_log_file_size_in_bytes) ? payload_size : c_max_log_file_size_in_bytes; + m_current_file = std::make_unique(s_wal_dir_path, s_dir_fd, file_sequence_t(s_file_num), file_size); + } + + auto current_offset = m_current_file->get_current_offset(); + m_current_file->allocate(payload_size); + + // Return starting offset of the allocation. + return current_offset; +} + +void log_handler_t::create_decision_record(const decision_list_t& txn_decisions) +{ + ASSERT_PRECONDITION(!txn_decisions.empty(), "Decision record cannot have empty payload."); + + // Track decisions per batch. + m_async_disk_writer->add_decisions_to_batch(txn_decisions); + + // Create decision record and enqueue a pwrite() request for the same. + std::vector writes_to_submit; + size_t txn_decision_size = txn_decisions.size() * (sizeof(decision_entry_t)); + size_t total_log_space_needed = txn_decision_size + sizeof(record_header_t); + allocate_log_space(total_log_space_needed); + + // Create log record header. + record_header_t header{}; + header.crc = c_crc_initial_value; + header.payload_size = total_log_space_needed; + header.decision_count = txn_decisions.size(); + header.txn_commit_ts = c_invalid_gaia_txn_id; + header.record_type = record_type_t::decision; + + // Compute crc. + crc32_t txn_crc = 0; + txn_crc = calculate_crc32(txn_crc, &header, sizeof(record_header_t)); + txn_crc = calculate_crc32(txn_crc, txn_decisions.data(), txn_decision_size); + + ASSERT_INVARIANT(txn_crc != 0, "CRC cannot be zero."); + header.crc = txn_crc; + + // Copy information which needs to survive for the entire batch lifetime into the metadata buffer. + auto header_ptr = m_async_disk_writer->copy_into_metadata_buffer( + &header, sizeof(record_header_t), m_current_file->get_file_fd()); + auto txn_decisions_ptr = m_async_disk_writer->copy_into_metadata_buffer( + txn_decisions.data(), txn_decision_size, m_current_file->get_file_fd()); + + writes_to_submit.push_back({header_ptr, sizeof(record_header_t)}); + writes_to_submit.push_back({txn_decisions_ptr, txn_decision_size}); + + m_async_disk_writer->enqueue_pwritev_requests( + writes_to_submit, m_current_file->get_file_fd(), + m_current_file->get_current_offset(), uring_op_t::pwritev_decision); +} + +void log_handler_t::process_txn_log_and_write(log_offset_t log_offset, gaia_txn_id_t commit_ts) +{ + // Map in memory txn_log. + auto txn_log = get_txn_log_from_offset(log_offset); + + std::vector deleted_ids; + + // Extract the original sequence of chunks used for this txn from txn log + // offsets and sequences. (This is necessary because the txn log has been + // sorted in-place by locator at this point, and chunks may be allocated out + // of chunk offset order.) + + // Find the lowest- and highest-allocated offset for each chunk, to + // determine the ranges of shared memory that need to be copied into the + // WAL. As long as the checkpointer applies versions in the same order that + // they appear in the WAL, the final state of the checkpoint will be + // consistent with the committed state of this txn. + + // Use a sorted vector and binary search for simplicity and cache efficiency + // (we only sort the vector when we add a new chunk). Linear search would + // nearly always have acceptable performance (most txns will use only one + // chunk), but would have poor worst-case performance (a txn may use up to + // 2^10 chunks). + struct chunk_data_t + { + chunk_offset_t chunk_id; + uint16_t min_sequence; + gaia_offset_t min_offset; + gaia_offset_t max_offset; + }; + std::vector chunk_data; + + // Obtain deleted IDs and min/max offsets per chunk. + for (size_t i = 0; i < txn_log->record_count; ++i) + { + auto lr = txn_log->log_records[i]; + + // Extract deleted ID and go to next record. + if (lr.operation() == gaia_operation_t::remove) + { + deleted_ids.push_back(offset_to_ptr(lr.old_offset)->id); + continue; + } + + // Extract chunk ID from current offset. + auto offset = lr.new_offset; + auto sequence = lr.sequence; + auto chunk_id = chunk_from_offset(offset); + + // Because this vector is sorted by chunk ID, we can use binary + // search to find the index of this chunk. + chunk_data_t target{}; + target.chunk_id = chunk_id; + auto it = std::lower_bound( + chunk_data.begin(), chunk_data.end(), target, + [](const chunk_data_t& lhs, const chunk_data_t& rhs) { return lhs.chunk_id < rhs.chunk_id; }); + + // If chunk isn't present, add an entry for it in sorted order. + if (it == chunk_data.end() || it->chunk_id != chunk_id) + { + chunk_data.insert(it, {chunk_id, lr.sequence, offset, offset}); + } + + // Update entry for this chunk. + auto& entry = *it; + ASSERT_INVARIANT(entry.chunk_id == chunk_id, "Current chunk entry must store current chunk ID!"); + entry.min_offset = std::min(entry.min_offset, offset); + entry.max_offset = std::max(entry.max_offset, offset); + entry.min_sequence = std::min(entry.min_sequence, sequence); + } + + // Sort chunk data by sequence rather than chunk ID, to ensure that the WAL + // is applied in txn order. + std::sort( + chunk_data.begin(), chunk_data.end(), + [](const chunk_data_t& lhs, const chunk_data_t& rhs) { return lhs.min_sequence < rhs.min_sequence; }); + + // Construct an iovec for the data in each chunk to be copied to the WAL. + std::vector chunk_data_iovecs; + + for (const auto& entry : chunk_data) + { + ASSERT_INVARIANT(entry.min_offset <= entry.max_offset, "Lowest offset in chunk cannot exceed highest offset in chunk!"); + auto first_obj_ptr = offset_to_ptr(entry.min_offset); + auto last_obj_ptr = offset_to_ptr(entry.max_offset); + auto last_payload_size = last_obj_ptr->payload_size + c_db_object_header_size; + size_t last_allocation_size = calculate_allocation_size_in_slots(last_payload_size) * c_slot_size_in_bytes; + auto last_allocation_end_ptr = reinterpret_cast(last_obj_ptr) + last_allocation_size; + auto chunk_allocation_size = static_cast(last_allocation_end_ptr - reinterpret_cast(first_obj_ptr)); + chunk_data_iovecs.push_back({first_obj_ptr, chunk_allocation_size}); + } + + if (!deleted_ids.empty() || !chunk_data_iovecs.empty()) + { + create_txn_record(commit_ts, record_type_t::txn, chunk_data_iovecs, deleted_ids); + } +} + +void log_handler_t::perform_flushed_batch_maintenance() +{ + m_async_disk_writer->perform_post_completion_maintenance(); +} + +void log_handler_t::submit_writes(bool should_wait_for_completion) +{ + m_async_disk_writer->submit_and_swap_in_progress_batch(m_current_file->get_file_fd(), should_wait_for_completion); +} + +void log_handler_t::create_txn_record( + gaia_txn_id_t commit_ts, + record_type_t type, + const std::vector& data_iovecs, + const std::vector& deleted_ids) +{ + ASSERT_PRECONDITION( + !deleted_ids.empty() || !data_iovecs.empty(), + "Txn record cannot have empty payload."); + + std::vector writes_to_submit; + + // Sum all iovec sizes to get initial payload size. + size_t payload_size = std::accumulate( + data_iovecs.begin(), data_iovecs.end(), 0, + [](int sum, const iovec& iov) { return sum + iov.iov_len; }); + + // Reserve iovec to store header for the log record. + struct iovec header_entry = {nullptr, 0}; + writes_to_submit.push_back(header_entry); + + // Append data iovecs to submitted writes. + writes_to_submit.insert( + writes_to_submit.end(), data_iovecs.begin(), data_iovecs.end()); + + // Augment payload size with the size of deleted ids. + auto deleted_size = deleted_ids.size() * sizeof(gaia_id_t); + payload_size += deleted_size; + + // Calculate total log space needed. + auto total_log_space_needed = payload_size + sizeof(record_header_t); + + // Allocate log space. + auto begin_offset = allocate_log_space(total_log_space_needed); + + // Create header. + record_header_t header; + header.crc = c_crc_initial_value; + header.payload_size = total_log_space_needed; + header.deleted_object_count = deleted_ids.size(); + header.txn_commit_ts = commit_ts; + header.record_type = type; + + // Calculate CRC. + auto txn_crc = calculate_crc32(0, &header, sizeof(record_header_t)); + + // Start from 1 to skip CRC calculation for the first entry. + for (size_t i = 1; i < writes_to_submit.size(); i++) + { + txn_crc = calculate_crc32(txn_crc, writes_to_submit.at(i).iov_base, writes_to_submit.at(i).iov_len); + } + + // Augment CRC calculation with set of deleted object IDs. + txn_crc = calculate_crc32(txn_crc, deleted_ids.data(), deleted_size); + + // Update CRC in header before sending it to the async_disk_writer. + ASSERT_INVARIANT(txn_crc != 0, "Computed CRC cannot be zero."); + header.crc = txn_crc; + + // Copy the header into the metadata buffer as it needs to survive the lifetime of the async_write_batch it is a part of. + auto header_ptr = m_async_disk_writer->copy_into_metadata_buffer(&header, sizeof(record_header_t), m_current_file->get_file_fd()); + + // Update the first iovec entry with the header information. + writes_to_submit.at(0).iov_base = header_ptr; + writes_to_submit.at(0).iov_len = sizeof(record_header_t); + + // Allocate space for deleted writes in the metadata buffer. + if (!deleted_ids.empty()) + { + auto deleted_id_ptr = m_async_disk_writer->copy_into_metadata_buffer(deleted_ids.data(), deleted_size, m_current_file->get_file_fd()); + writes_to_submit.push_back({deleted_id_ptr, deleted_size}); + } + + // Finally send I/O requests to the async_disk_writer. + m_async_disk_writer->enqueue_pwritev_requests(writes_to_submit, m_current_file->get_file_fd(), begin_offset, uring_op_t::pwritev_txn); +} + +} // namespace persistence +} // namespace db +} // namespace gaia diff --git a/production/db/inc/core/async_disk_writer.hpp b/production/db/inc/core/async_disk_writer.hpp index 94a81d074af8..71afff3e0b62 100644 --- a/production/db/inc/core/async_disk_writer.hpp +++ b/production/db/inc/core/async_disk_writer.hpp @@ -37,7 +37,7 @@ namespace persistence class async_disk_writer_t { public: - async_disk_writer_t(int validate_flushed_batch_efd, int signal_checkpoint_efd); + async_disk_writer_t(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd); ~async_disk_writer_t(); @@ -98,21 +98,14 @@ class async_disk_writer_t /** * Copy any temporary writes (which don't exist in gaia shared memory) into the metadata buffer. */ - unsigned char* copy_into_metadata_buffer(void* source, size_t size, int file_fd); + unsigned char* copy_into_metadata_buffer(const void* source, size_t size, int file_fd); /** * Perform maintenance actions on in_flight batch after all of its IO entries have been processed. */ void perform_post_completion_maintenance(); - void add_decisions_to_batch(decision_list_t& decisions); - - /** - * For each commit ts, keep track of the eventfd which the session thread blocks on. Once the txn - * has been made durable, this eventfd is written to so that the session thread can make progress and - * return commit decision to the client. - */ - void map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_efd); + void add_decisions_to_batch(const decision_list_t& decisions); private: // Reserve slots in the in_progress batch to be able to append additional operations to it (before it gets submitted to the kernel) @@ -120,21 +113,21 @@ class async_disk_writer_t static constexpr size_t c_single_submission_entry_count = 1; static constexpr size_t c_async_batch_size = 32; static constexpr size_t c_max_iovec_array_size_bytes = IOV_MAX * sizeof(iovec); - static inline eventfd_t c_default_flush_efd_value = 1; - static inline iovec c_default_iov = {static_cast(&c_default_flush_efd_value), sizeof(eventfd_t)}; + static inline eventfd_t c_default_flush_eventfd_value = 1; + static inline iovec c_default_iov = {static_cast(&c_default_flush_eventfd_value), sizeof(eventfd_t)}; // eventfd to signal that a batch flush has completed. // Used to block new writes to disk when a batch is already getting flushed. - static inline int s_flush_efd = -1; + static inline int s_flush_eventfd = -1; // eventfd to signal that the IO results belonging to a batch are ready to be validated. - int m_validate_flush_efd = -1; + int m_validate_flush_eventfd = -1; // eventfd to signal that a file is ready to be checkpointed. - int m_signal_checkpoint_efd = -1; + int m_signal_checkpoint_eventfd = -1; // Keep track of session threads to unblock. - std::unordered_map m_ts_to_session_decision_fd_map; + std::unordered_map m_ts_to_session_decision_eventfd_map; // Writes are batched and we maintain two buffers so that writes to a buffer // can still proceed when the other buffer is getting flushed to disk. diff --git a/production/db/inc/core/log_file.hpp b/production/db/inc/core/log_file.hpp index dc818103c5dc..02fa4d5f8b1d 100644 --- a/production/db/inc/core/log_file.hpp +++ b/production/db/inc/core/log_file.hpp @@ -34,29 +34,34 @@ class log_file_t /** * Obtain offset to write the next log record at. */ - size_t get_current_offset(); + file_offset_t get_current_offset() const; /** * Get remaining space in persistent log file. */ - size_t get_remaining_bytes_count(size_t record_size); + size_t get_bytes_remaining_after_append(size_t record_size) const; /** * Allocate space in persistent log file. */ void allocate(size_t size); - int get_file_fd(); + int get_file_fd() const; + + /** + * Obtain sequence number for the file. + */ + file_sequence_t get_file_sequence() const; private: - size_t m_file_size; - file_sequence_t m_file_seq; - size_t m_current_offset; - std::string m_dir_name; - int m_dir_fd; - int m_file_fd; - std::string m_file_name; - inline static constexpr int c_file_permissions = 0666; + std::string m_dir_name{}; + int m_dir_fd{-1}; + std::string m_file_name{}; + int m_file_fd{-1}; + file_sequence_t m_file_seq{0}; + size_t m_file_size{0}; + size_t m_current_offset{0}; + inline static constexpr int c_file_permissions{0666}; }; } // namespace persistence diff --git a/production/db/inc/core/log_io.hpp b/production/db/inc/core/log_io.hpp new file mode 100644 index 000000000000..757d04b97f37 --- /dev/null +++ b/production/db/inc/core/log_io.hpp @@ -0,0 +1,108 @@ +///////////////////////////////////////////// +// Copyright (c) Gaia Platform LLC +// All rights reserved. +///////////////////////////////////////////// + +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include "gaia/common.hpp" + +#include "gaia_internal/db/db_object.hpp" + +#include "async_disk_writer.hpp" +#include "db_internal_types.hpp" +#include "log_file.hpp" +#include "memory_manager.hpp" + +namespace gaia +{ +namespace db +{ +namespace persistence +{ + +/* + * Fill the record_header.crc field with CRC_INITIAL_VALUE when + * computing the checksum: crc32c is vulnerable to 0-prefixing, + * so we make sure the initial bytes are non-zero. + * + * https://stackoverflow.com/questions/2321676/data-length-vs-crc-length + * "From the wikipedia article: "maximal total blocklength is equal to 2r − 1". That's in bits. + * You don't need to do much research to see that 29 - 1 is 511 bits. Using CRC-8, + * multiple messages longer than 64 bytes will have the same CRC checksum value." + * So CRC-16 would have max message size 2^17-1 bits or about 2^14 bytes = 16KB, + * and CRC-32 would have max message size 2^33-1 bits or about 2^30 bytes = 1GB + */ +static constexpr crc32_t c_crc_initial_value = ((uint32_t)-1); + +class log_handler_t +{ +public: + explicit log_handler_t(const std::string& data_dir_path); + ~log_handler_t(); + void open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd); + + /** + * Allocate space in the log file and return starting offset of allocation. + */ + file_offset_t allocate_log_space(size_t payload_size); + + /** + * Create a log record which stores txn information. + */ + void create_txn_record( + gaia_txn_id_t commit_ts, + record_type_t type, + const std::vector& data_iovecs, + const std::vector& deleted_ids); + + /** + * Process the in memory txn_log and submit the processed writes (generated log records) to the async_disk_writer. + */ + void process_txn_log_and_write(log_offset_t log_offset, gaia_txn_id_t commit_ts); + + /** + * Create a log record which stores decisions for one or more txns. + */ + void create_decision_record(const decision_list_t& txn_decisions); + + /** + * Submit async_disk_writer's internal I/O request queue to the kernel for processing. + */ + void submit_writes(bool should_wait_for_completion); + + /** + * Validate the result of I/O calls submitted to the kernel for processing. + */ + void perform_flushed_batch_maintenance(); + +private: + static constexpr char c_gaia_wal_dir_name[] = "wal_dir"; + static constexpr int c_gaia_wal_dir_permissions = S_IRWXU | (S_IRGRP | S_IROTH | S_IXGRP | S_IXOTH); + static inline std::filesystem::path s_wal_dir_path{}; + static inline int s_dir_fd{-1}; + + // Log file sequence starts from 1. + static inline std::atomic s_file_num{1}; + + // Keep track of the current log file. + std::unique_ptr m_current_file; + + std::unique_ptr m_async_disk_writer; +}; + +} // namespace persistence +} // namespace db +} // namespace gaia diff --git a/production/db/inc/core/memory_helpers.hpp b/production/db/inc/core/memory_helpers.hpp index ca1c2884858c..5370be6cc4a4 100644 --- a/production/db/inc/core/memory_helpers.hpp +++ b/production/db/inc/core/memory_helpers.hpp @@ -25,6 +25,8 @@ inline gaia_offset_t offset_from_chunk_and_slot( inline void* page_address_from_offset(gaia_offset_t offset); +inline size_t calculate_allocation_size_in_slots(size_t allocation_size_in_bytes); + // Converts a slot offset to its bitmap index. inline size_t slot_to_bit_index(slot_offset_t slot_offset); diff --git a/production/db/inc/core/memory_helpers.inc b/production/db/inc/core/memory_helpers.inc index 568bb808816b..6cd9e502a9d6 100644 --- a/production/db/inc/core/memory_helpers.inc +++ b/production/db/inc/core/memory_helpers.inc @@ -29,6 +29,27 @@ slot_offset_t slot_from_offset(gaia_offset_t offset) return static_cast(offset & mask); } +size_t calculate_allocation_size_in_slots(size_t allocation_size_in_bytes) +{ + ASSERT_PRECONDITION(allocation_size_in_bytes > 0, "Requested allocation size cannot be 0!"); + + // Check before converting to slot units to avoid overflow. + ASSERT_PRECONDITION( + allocation_size_in_bytes <= (c_max_allocation_size_in_slots * c_slot_size_in_bytes), + "Requested allocation size exceeds maximum allocation size of 64KB!"); + + // Calculate allocation size in slot units. +#ifdef DEBUG + // Round up allocation to a page so we can mprotect() it. + size_t allocation_size_in_pages = (allocation_size_in_bytes + c_page_size_in_bytes - 1) / c_page_size_in_bytes; + size_t allocation_size_in_slots = allocation_size_in_pages * (c_page_size_in_bytes / c_slot_size_in_bytes); +#else + size_t allocation_size_in_slots = (allocation_size_in_bytes + c_slot_size_in_bytes - 1) / c_slot_size_in_bytes; +#endif + + return allocation_size_in_slots; +} + gaia_offset_t offset_from_chunk_and_slot( chunk_offset_t chunk_offset, slot_offset_t slot_offset) { diff --git a/production/db/inc/core/persistence_types.hpp b/production/db/inc/core/persistence_types.hpp index f41526deb0cd..a4cfc927bf40 100644 --- a/production/db/inc/core/persistence_types.hpp +++ b/production/db/inc/core/persistence_types.hpp @@ -23,6 +23,13 @@ namespace db namespace persistence { +enum class record_type_t : size_t +{ + not_set = 0x0, + txn = 0x1, + decision = 0x2, +}; + enum class decision_type_t : uint8_t { undecided = 0, @@ -36,6 +43,14 @@ struct decision_entry_t decision_type_t decision; }; +// Represents start and end offsets of a set of contiguous objects. +// They can span chunks. +struct contiguous_offsets_t +{ + gaia_offset_t offset1; + gaia_offset_t offset2; +}; + // Pair of log file sequence number and file fd. typedef std::vector decision_list_t; @@ -60,6 +75,72 @@ static_assert( constexpr file_sequence_t c_invalid_file_sequence_number; +static constexpr uint64_t c_max_log_file_size_in_bytes = 4 * 1024 * 1024; + +class file_offset_t : public common::int_type_t +{ +public: + // By default, we should initialize to 0. + constexpr file_offset_t() + : common::int_type_t() + { + } + + constexpr file_offset_t(size_t value) + : common::int_type_t(value) + { + } +}; + +static_assert( + sizeof(file_offset_t) == sizeof(file_offset_t::value_type), + "file_offset_t has a different size than its underlying integer type!"); + +// The record size is constrained by the size of the log file. +// We'd never need more than 32 bits for the record size. +class record_size_t : public common::int_type_t +{ +public: + // By default, we should initialize to an invalid value. + constexpr record_size_t() + : common::int_type_t() + { + } + + constexpr record_size_t(uint32_t value) + : common::int_type_t(value) + { + } +}; + +static_assert( + sizeof(record_size_t) == sizeof(record_size_t::value_type), + "record_size_t has a different size than its underlying integer type!"); + +constexpr record_size_t c_invalid_record_size; + +// https://stackoverflow.com/questions/2321676/data-length-vs-crc-length +// "From the wikipedia article: "maximal total blocklength is equal to 2**r − 1". That's in bits. +// So CRC-32 would have max message size 2^33-1 bits or about 2^30 bytes = 1GB +class crc32_t : public common::int_type_t +{ +public: + // By default, we should initialize to 0. + constexpr crc32_t() + : common::int_type_t() + { + } + + constexpr crc32_t(uint32_t value) + : common::int_type_t(value) + { + } +}; + +static_assert( + sizeof(crc32_t) == sizeof(crc32_t::value_type), + "crc32_t has a different size than its underlying integer type!"); + // This assertion ensures that the default type initialization // matches the value of the invalid constant. static_assert( @@ -72,6 +153,27 @@ struct log_file_info_t int file_fd; }; +struct record_header_t +{ + crc32_t crc; + record_size_t payload_size; + record_type_t record_type; + gaia_txn_id_t txn_commit_ts; + + // Stores a count value depending on the record type. + // For a txn record, this represents the count of deleted objects. + // For a decision record, this represents the number of decisions in the record's payload. + union + { + uint32_t deleted_object_count; + uint32_t decision_count; + }; +}; + +static_assert( + sizeof(record_header_t) % 8 == 0, + "record_header_t should be a multiple of 8 bytes!"); + // The primary motivation of this buffer is to keep a hold of any additional information we want to write to the log // apart from the shared memory objects. // Custom information includes