Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion production/db/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ endif()
add_library(gaia_db_persistence STATIC
src/log_file.cpp
src/async_disk_writer.cpp
src/async_write_batch.cpp)
src/async_write_batch.cpp
src/log_io.cpp)
configure_gaia_target(gaia_db_persistence)
target_include_directories(gaia_db_persistence PRIVATE
"${GAIA_DB_CORE_PUBLIC_INCLUDES}"
Expand Down
26 changes: 8 additions & 18 deletions production/db/core/inc/async_disk_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace persistence
class async_disk_writer_t
{
public:
async_disk_writer_t(int validate_flushed_batch_efd, int signal_checkpoint_efd);
async_disk_writer_t(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd);

~async_disk_writer_t();

Expand Down Expand Up @@ -98,43 +98,33 @@ class async_disk_writer_t
/**
* Copy any temporary writes (which don't exist in gaia shared memory) into the metadata buffer.
*/
unsigned char* copy_into_metadata_buffer(void* source, size_t size, int file_fd);
unsigned char* copy_into_metadata_buffer(const void* source, size_t size, int file_fd);

/**
* Perform maintenance actions on in_flight batch after all of its IO entries have been processed.
*/
void perform_post_completion_maintenance();

void add_decisions_to_batch(decision_list_t& decisions);

/**
* For each commit ts, keep track of the eventfd which the session thread blocks on. Once the txn
* has been made durable, this eventfd is written to so that the session thread can make progress and
* return commit decision to the client.
*/
void map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_efd);
void add_decisions_to_batch(const decision_list_t& decisions);

private:
// Reserve slots in the in_progress batch to be able to append additional operations to it (before it gets submitted to the kernel)
static constexpr size_t c_submit_batch_sqe_count = 3;
static constexpr size_t c_single_submission_entry_count = 1;
static constexpr size_t c_async_batch_size = 32;
static constexpr size_t c_max_iovec_array_size_bytes = IOV_MAX * sizeof(iovec);
static inline eventfd_t c_default_flush_efd_value = 1;
static inline iovec c_default_iov = {static_cast<void*>(&c_default_flush_efd_value), sizeof(eventfd_t)};
static inline eventfd_t c_default_flush_eventfd_value = 1;
static inline iovec c_default_iov = {static_cast<void*>(&c_default_flush_eventfd_value), sizeof(eventfd_t)};

// eventfd to signal that a batch flush has completed.
// Used to block new writes to disk when a batch is already getting flushed.
static inline int s_flush_efd = -1;
static inline int s_flush_eventfd = -1;

// eventfd to signal that the IO results belonging to a batch are ready to be validated.
int m_validate_flush_efd = -1;
int m_log_flush_eventfd = -1;

// eventfd to signal that a file is ready to be checkpointed.
int m_signal_checkpoint_efd = -1;

// Keep track of session threads to unblock.
std::unordered_map<gaia_txn_id_t, int> m_ts_to_session_decision_fd_map;
int m_signal_checkpoint_eventfd = -1;

// Writes are batched and we maintain two buffers so that writes to a buffer
// can still proceed when the other buffer is getting flushed to disk.
Expand Down
2 changes: 2 additions & 0 deletions production/db/core/inc/db_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class client_t
thread_local static inline mapped_data_t<locators_t> s_private_locators;
thread_local static inline gaia::db::index::indexes_t s_local_indexes{};

thread_local static inline size_t s_txn_memory_size_bytes = 0;

// These fields have session lifetime.
thread_local static inline config::session_options_t s_session_options;

Expand Down
22 changes: 20 additions & 2 deletions production/db/core/inc/db_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,24 @@ inline void allocate_object(
{
memory_manager::memory_manager_t* memory_manager = gaia::db::get_memory_manager();
memory_manager::chunk_manager_t* chunk_manager = gaia::db::get_chunk_manager();

size_t size_to_allocate = size + c_db_object_header_size;

if (get_current_txn_memory_size_bytes() != nullptr)
{
if (*get_current_txn_memory_size_bytes() + size_to_allocate <= c_max_txn_memory_size_bytes)
{
*get_current_txn_memory_size_bytes() += size_to_allocate;
}
else
{
throw transaction_memory_limit_exceeded();
}
}

// The allocation can fail either because there is no current chunk, or
// because the current chunk is full.
gaia_offset_t object_offset = chunk_manager->allocate(size + c_db_object_header_size);
gaia_offset_t object_offset = chunk_manager->allocate(size_to_allocate);
if (!object_offset.is_valid())
{
if (chunk_manager->initialized())
Expand All @@ -155,8 +169,12 @@ inline void allocate_object(
// Initialize the new chunk.
chunk_manager->initialize(new_chunk_offset);

// // Before we allocate, persist current chunk ID in txn log, for access
// // on the server in case we crash.
// gaia::db::get_mapped_log()->data()->current_chunk = new_chunk_offset;

// Allocate from new chunk.
object_offset = chunk_manager->allocate(size + c_db_object_header_size);
object_offset = chunk_manager->allocate(size_to_allocate);
}

ASSERT_POSTCONDITION(
Expand Down
14 changes: 12 additions & 2 deletions production/db/core/inc/db_internal_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,16 @@ constexpr size_t c_max_locators = (1ULL << 32) - 1;
// similarly optimized by substituting locators for gaia_ids.
constexpr size_t c_hash_buckets = 1ULL << 20;

// This is arbitrary, but we need to keep txn logs to a reasonable size.
constexpr size_t c_max_log_records = 1ULL << 20;
// // Track maximum number of new chunks (apart from the one that the txn is already using)
// // that can be allocated per transaction.
// // This sets an upper bound on txn size: 32MB < txn_size < 36MB
// constexpr size_t c_max_chunks_per_txn = 8;

// This sets an upper bound on the size of the transaction.
constexpr size_t c_max_txn_memory_size_bytes = 64 * 1024 * 1024;

// 8 chunks can hold up to 8 * (2^16 - 2^8) = 522240 64B objects,
constexpr size_t c_max_log_records = 522240;

// This is an array of offsets in the data segment corresponding to object
// versions, where each array index is referred to as a "locator."
Expand All @@ -121,6 +129,7 @@ struct txn_log_t
{
gaia_txn_id_t begin_ts;
size_t record_count;
// size_t chunk_count;

struct log_record_t
{
Expand Down Expand Up @@ -166,6 +175,7 @@ struct txn_log_t
};

log_record_t log_records[c_max_log_records];
// gaia_offset_t chunks[c_max_chunks_per_txn];

friend std::ostream& operator<<(std::ostream& os, const txn_log_t& l)
{
Expand Down
36 changes: 35 additions & 1 deletion production/db/core/inc/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "gaia_internal/common/generator_iterator.hpp"

#include "db_internal_types.hpp"
#include "log_io.hpp"
#include "mapped_data.hpp"
#include "memory_manager.hpp"
#include "messages_generated.h"
Expand Down Expand Up @@ -107,6 +108,11 @@ class server_t
private:
static inline server_config_t s_server_conf{};

// TODO: Delete this once recovery/checkpointing implementation is in.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding all these pieces to db_server, can you instead create a separate abstraction for handling log writing and just have the server keep track of an instance of it? That should separate the main server code from the log writing component.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static inline bool c_use_gaia_log_implementation = false;

static constexpr uint64_t c_txn_group_timeout_us = 100;

// This is arbitrary but seems like a reasonable starting point (pending benchmarks).
static constexpr size_t c_stream_batch_size{1ULL << 10};

Expand All @@ -121,6 +127,19 @@ class server_t
static inline int s_server_shutdown_eventfd = -1;
static inline int s_listening_socket = -1;

// Signals the log writer thread to persist txn updates.
static inline int s_signal_log_write_eventfd = -1;

// Signals the log writer thread to persist txn decisions.
static inline int s_signal_decision_eventfd = -1;

// Signals the checkpointing thread to merge log file updates into the LSM store.
static inline int s_signal_checkpoint_log_eventfd = -1;

// To signal to the persistence thread to check the return values of a batch of async I/O
// operations post batch flush.
static inline int s_do_write_batch_maintenance_eventfd = -1;

// These thread objects are owned by the client dispatch thread.
static inline std::vector<std::thread> s_session_threads{};

Expand All @@ -130,6 +149,7 @@ class server_t
static inline mapped_data_t<id_index_t> s_shared_id_index{};
static inline index::indexes_t s_global_indexes{};
static inline std::unique_ptr<persistent_store_manager> s_persistent_store{};
static inline std::unique_ptr<persistence::log_handler_t> s_log_handler{};

// These fields have transaction lifetime.
thread_local static inline gaia_txn_id_t s_txn_id = c_invalid_gaia_txn_id;
Expand All @@ -148,7 +168,6 @@ class server_t
thread_local static inline messages::session_state_t s_session_state = messages::session_state_t::DISCONNECTED;
thread_local static inline bool s_session_shutdown = false;
thread_local static inline int s_session_shutdown_eventfd = -1;

thread_local static inline gaia::db::memory_manager::memory_manager_t s_memory_manager{};
thread_local static inline gaia::db::memory_manager::chunk_manager_t s_chunk_manager{};

Expand Down Expand Up @@ -239,6 +258,12 @@ class server_t
// The current thread's index in `s_safe_ts_per_thread_entries`.
thread_local static inline size_t s_safe_ts_index{c_invalid_safe_ts_index};

// Keep track of the last txn that has been submitted to the async_disk_writer.
static inline std::atomic<gaia_txn_id_t> s_last_queued_commit_ts_upper_bound = c_invalid_gaia_txn_id;

// Keep a track of undecided txns submitted to the async_disk_writer.
static inline std::set<gaia_txn_id_t> s_txn_decision_not_queued_set{};

private:
// Returns the current value of the given watermark.
static inline gaia_txn_id_t get_watermark(watermark_type_t watermark_type)
Expand Down Expand Up @@ -406,6 +431,15 @@ class server_t

static void client_dispatch_handler(const std::string& socket_name);

static void log_writer_handler();

static void persist_pending_writes(bool should_wait_for_completion = false);

static void recover_persistent_log();

// Method should only be called on server shutdown.
static void flush_pending_writes_on_server_shutdown();

static void session_handler(int session_socket);

static std::pair<int, int> get_stream_socket_pair();
Expand Down
2 changes: 2 additions & 0 deletions production/db/core/inc/db_shared_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ gaia::db::memory_manager::chunk_manager_t* get_chunk_manager();
// Gets the mapped transaction log for the current session thread.
gaia::db::mapped_log_t* get_mapped_log();

size_t get_current_txn_memory_size_bytes();

} // namespace db
} // namespace gaia
13 changes: 9 additions & 4 deletions production/db/core/inc/log_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,29 @@ class log_file_t
/**
* Obtain offset to write the next log record at.
*/
size_t get_current_offset();
const file_offset_t get_current_offset();

/**
* Get remaining space in persistent log file.
*/
size_t get_remaining_bytes_count(size_t record_size);
const size_t get_bytes_remaining_after_append(size_t record_size);

/**
* Allocate space in persistent log file.
*/
void allocate(size_t size);

int get_file_fd();
const int get_file_fd();

/**
* Obtain sequence number for the file.
*/
const file_sequence_t get_file_sequence();

private:
size_t m_file_size;
file_sequence_t m_file_seq;
size_t m_current_offset;
file_offset_t m_current_offset;
std::string m_dir_name;
int m_dir_fd;
int m_file_fd;
Expand Down
108 changes: 108 additions & 0 deletions production/db/core/inc/log_io.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/////////////////////////////////////////////
// Copyright (c) Gaia Platform LLC
// All rights reserved.
/////////////////////////////////////////////

#pragma once

#include <fcntl.h>
#include <unistd.h>

#include <cstddef>

#include <atomic>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>

#include "gaia/common.hpp"
#include <gaia_internal/common/topological_sort.hpp>

#include "gaia_internal/db/db_object.hpp"

#include "async_disk_writer.hpp"
#include "db_internal_types.hpp"
#include "log_file.hpp"
#include "memory_manager.hpp"

namespace gaia
{
namespace db
{
namespace persistence
{

/*
* Fill the record_header.crc field with CRC_INITIAL_VALUE when
* computing the checksum: crc32c is vulnerable to 0-prefixing,
* so we make sure the initial bytes are non-zero.
*
* https://stackoverflow.com/questions/2321676/data-length-vs-crc-length
* "From the wikipedia article: "maximal total blocklength is equal to 2r − 1". That's in bits.
* You don't need to do much research to see that 29 - 1 is 511 bits. Using CRC-8,
* multiple messages longer than 64 bytes will have the same CRC checksum value."
* So CRC-16 would have max message size 2^17-1 bits or about 2^14 bytes = 16KB,
* and CRC-32 would have max message size 2^33-1 bits or about 2^30 bytes = 1GB
*/
static constexpr crc32_t c_crc_initial_value = ((uint32_t)-1);

class log_handler_t
{
public:
explicit log_handler_t(const std::string& directory_path);
~log_handler_t();
void open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd);

/**
* Allocate space in the log file and return starting offset of allocation.
*/
file_offset_t allocate_log_space(size_t payload_size);

/**
* Create a log record which stores txn information.
*/
void create_txn_record(
gaia_txn_id_t commit_ts,
record_type_t type,
std::vector<contiguous_offsets_t>& object_offsets,
std::vector<gaia::common::gaia_id_t>& deleted_ids);

/**
* Process the in memory txn_log and submit the processed writes (generated log records) to the async_disk_writer.
*/
void process_txn_log_and_write(int txn_log_fd, gaia_txn_id_t commit_ts);

/**
* Create a log record which stores decisions for one or more txns.
*/
void create_decision_record(const decision_list_t& txn_decisions);

/**
* Submit async_disk_writer's internal I/O request queue to the kernel for processing.
*/
void submit_writes(bool should_wait_for_completion);

/**
* Validate the result of I/O calls submitted to the kernel for processing.
*/
void check_flushed_batch_results_and_do_maintenance();

private:
static constexpr char c_gaia_wal_dir_name[] = "/wal_dir";
static constexpr int c_gaia_wal_dir_permissions = S_IRWXU | (S_IRGRP | S_IROTH | S_IXGRP | S_IXOTH);
static inline std::string s_wal_dir_path{};
static inline int s_dir_fd = -1;

// Log file sequence starts from 1.
static inline std::atomic<file_sequence_t::value_type> s_file_num = 1;

// Keep track of the current log file.
std::unique_ptr<log_file_t> m_current_file;

std::unique_ptr<async_disk_writer_t> m_async_disk_writer;
};

} // namespace persistence
} // namespace db
} // namespace gaia
Loading