-
Notifications
You must be signed in to change notification settings - Fork 6
Introduce log writer thread #1146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
e4d426b
64325c4
e9407d9
b72466a
0508152
cbdad1d
5fe5efa
f0d039c
baa1012
82306c9
d76c1f8
9a709e6
033e3dd
dc0fdb8
9e68c3b
58fafe3
9d4ffc5
a7de946
34a9454
59914ad
c80c6cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -132,6 +132,11 @@ inline void allocate_object( | |
| gaia_offset_t object_offset = chunk_manager->allocate(size + c_db_object_header_size); | ||
| if (object_offset == c_invalid_gaia_offset) | ||
| { | ||
| if (gaia::db::get_mapped_log()->data()->chunk_count == c_max_chunks_per_txn) | ||
| { | ||
| throw memory_allocation_error_internal("Maximum number of chunks for this transaction has been reached."); | ||
|
||
| } | ||
|
|
||
| if (chunk_manager->initialized()) | ||
| { | ||
| // The current chunk is out of memory, so retire it and allocate a new chunk. | ||
|
|
@@ -159,6 +164,9 @@ inline void allocate_object( | |
| // on the server in case we crash. | ||
| gaia::db::get_mapped_log()->data()->current_chunk = new_chunk_offset; | ||
|
|
||
| auto& chunk = gaia::db::get_mapped_log()->data()->chunks[gaia::db::get_mapped_log()->data()->chunk_count++]; | ||
|
||
| chunk = static_cast<size_t>(new_chunk_offset); | ||
|
|
||
| // Allocate from new chunk. | ||
| object_offset = chunk_manager->allocate(size + c_db_object_header_size); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| #include "gaia_internal/common/generator_iterator.hpp" | ||
|
|
||
| #include "db_internal_types.hpp" | ||
| #include "log_io.hpp" | ||
| #include "mapped_data.hpp" | ||
| #include "memory_manager.hpp" | ||
| #include "messages_generated.h" | ||
|
|
@@ -114,6 +115,12 @@ class server_t | |
| private: | ||
| static inline server_config_t s_server_conf{}; | ||
|
|
||
| // TODO: Delete this once recovery/checkpointing implementation is in. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of adding all these pieces to db_server, can you instead create a separate abstraction for handling log writing and just have the server keep track of an instance of it? That should separate the main server code from the log writing component.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this is tracked in https://gaiaplatform.atlassian.net/browse/GAIAPLAT-1818. |
||
| static inline bool c_use_gaia_log_implementation = false; | ||
|
|
||
| // TODO: Make configurable. | ||
| static constexpr int64_t c_txn_group_timeout_us = 100; | ||
|
||
|
|
||
| // This is arbitrary but seems like a reasonable starting point (pending benchmarks). | ||
| static constexpr size_t c_stream_batch_size{1ULL << 10}; | ||
|
|
||
|
|
@@ -128,6 +135,15 @@ class server_t | |
| static inline int s_server_shutdown_eventfd = -1; | ||
| static inline int s_listening_socket = -1; | ||
|
|
||
| // Signals the log writer thread to persist txn updates. | ||
| static inline int s_signal_log_write_eventfd = -1; | ||
|
|
||
| // Signals the log writer thread to persist txn decisions. | ||
| static inline int s_signal_decision_eventfd = -1; | ||
|
|
||
| // Signals the checkpointing thread to merge log file updates into the LSM store. | ||
| static inline int s_signal_checkpoint_log_eventfd = -1; | ||
|
|
||
| // These thread objects are owned by the client dispatch thread. | ||
| static inline std::vector<std::thread> s_session_threads{}; | ||
|
|
||
|
|
@@ -137,6 +153,7 @@ class server_t | |
| static inline mapped_data_t<id_index_t> s_shared_id_index{}; | ||
| static inline index::indexes_t s_global_indexes{}; | ||
| static inline std::unique_ptr<persistent_store_manager> s_persistent_store{}; | ||
| static inline std::unique_ptr<persistence::log_handler_t> s_log_handler{}; | ||
|
|
||
| // These fields have transaction lifetime. | ||
| thread_local static inline gaia_txn_id_t s_txn_id = c_invalid_gaia_txn_id; | ||
|
|
@@ -155,6 +172,11 @@ class server_t | |
| thread_local static inline bool s_session_shutdown = false; | ||
| thread_local static inline int s_session_shutdown_eventfd = -1; | ||
|
|
||
| thread_local static inline int s_session_decision_eventfd = -1; | ||
|
|
||
| // Signal to persistence thread that a batch is ready to be validated. | ||
|
||
| static inline int s_validate_persistence_batch_eventfd = -1; | ||
|
|
||
| // These thread objects are owned by the session thread that created them. | ||
| thread_local static inline std::vector<std::thread> s_session_owned_threads{}; | ||
|
|
||
|
|
@@ -242,6 +264,12 @@ class server_t | |
| // The current thread's index in `s_safe_ts_per_thread_entries`. | ||
| thread_local static inline size_t s_safe_ts_index{c_invalid_safe_ts_index}; | ||
|
|
||
| // Keep track of the last txn that has been submitted to the async_disk_writer. | ||
| static inline std::atomic<gaia_txn_id_t> s_last_queued_commit_ts_upper_bound = c_invalid_gaia_txn_id; | ||
|
|
||
| // Keep a track of undecided txns submitted to the async_disk_writer. | ||
| static inline std::set<gaia_txn_id_t> s_seen_and_undecided_txn_set{}; | ||
|
||
|
|
||
| private: | ||
| // Returns the current value of the given watermark. | ||
| static inline gaia_txn_id_t get_watermark(watermark_type_t watermark_type) | ||
|
|
@@ -409,6 +437,14 @@ class server_t | |
|
|
||
| static void client_dispatch_handler(const std::string& socket_name); | ||
|
|
||
| static void log_writer_handler(); | ||
|
|
||
| static void write_to_persistent_log(int64_t txn_group_timeout_us, bool sync_writes = false); | ||
|
||
|
|
||
| static void recover_persistent_log(); | ||
|
|
||
| static void flush_all_pending_writes(); | ||
|
||
|
|
||
| static void session_handler(int session_socket); | ||
|
|
||
| static std::pair<int, int> get_stream_socket_pair(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| ///////////////////////////////////////////// | ||
| // Copyright (c) Gaia Platform LLC | ||
| // All rights reserved. | ||
| ///////////////////////////////////////////// | ||
|
|
||
| #pragma once | ||
|
|
||
| #include <fcntl.h> | ||
| #include <unistd.h> | ||
|
|
||
| #include <cstddef> | ||
|
|
||
| #include <atomic> | ||
| #include <functional> | ||
| #include <memory> | ||
| #include <string> | ||
| #include <unordered_map> | ||
|
|
||
| #include "gaia/common.hpp" | ||
|
|
||
| #include "gaia_internal/db/db_object.hpp" | ||
|
|
||
| #include "async_disk_writer.hpp" | ||
| #include "db_internal_types.hpp" | ||
| #include "log_file.hpp" | ||
| #include "memory_manager.hpp" | ||
|
|
||
| namespace gaia | ||
| { | ||
| namespace db | ||
| { | ||
| namespace persistence | ||
| { | ||
|
|
||
| /* | ||
| * Fill the record_header.crc field with CRC_INITIAL_VALUE when | ||
| * computing the checksum: crc32c is vulnerable to 0-prefixing, | ||
| * so we make sure the initial bytes are non-zero. | ||
| */ | ||
| static constexpr crc32_t c_crc_initial_value = ((uint32_t)-1); | ||
|
|
||
| class log_handler_t | ||
| { | ||
| public: | ||
| explicit log_handler_t(const std::string& directory_path); | ||
| ~log_handler_t(); | ||
| void open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd); | ||
|
|
||
| /** | ||
| * Allocate space in the log file and return starting offset of allocation. | ||
| */ | ||
| file_offset_t allocate_log_space(size_t payload_size); | ||
|
|
||
| /** | ||
| * Create a log record which stores txn information. | ||
| */ | ||
| void create_txn_record( | ||
| gaia_txn_id_t commit_ts, | ||
| record_type_t type, | ||
| std::vector<gaia_offset_t>& object_offsets, | ||
| std::vector<gaia::common::gaia_id_t>& deleted_ids); | ||
|
|
||
| /** | ||
| * Process the in memory txn_log and submit the processed writes (generated log records) to the async_disk_writer. | ||
| */ | ||
| void process_txn_log_and_write(int txn_log_fd, gaia_txn_id_t commit_ts); | ||
|
|
||
| /** | ||
| * Create a log record which stores decisions for one or more txns. | ||
| */ | ||
| void create_decision_record(const decision_list_t& txn_decisions); | ||
|
|
||
| /** | ||
| * Submit async_disk_writer's internal I/O request queue to the kernel for processing. | ||
| */ | ||
| void submit_writes(bool sync); | ||
|
|
||
| /** | ||
| * Validate the result of I/O calls submitted to the kernel for processing. | ||
| */ | ||
| void validate_flushed_batch(); | ||
|
|
||
| /** | ||
| * Track the session_decision_eventfd for each commit_ts; txn_commit() will only return once | ||
| * session_decision_eventfd is written to by the log_writer thread - signifying that the txn decision | ||
| * has been persisted. | ||
| */ | ||
| void register_commit_ts_for_session_notification(gaia_txn_id_t commit_ts, int session_decision_eventfd); | ||
|
|
||
| private: | ||
| // TODO: Make log file size configurable. | ||
| static constexpr uint64_t c_file_size = 4 * 1024 * 1024; | ||
| static constexpr std::string_view c_gaia_wal_dir_name = "/wal_dir"; | ||
| static constexpr int c_gaia_wal_dir_permissions = 0755; | ||
| static inline std::string s_wal_dir_path{}; | ||
| static inline int s_dir_fd = -1; | ||
|
|
||
| // Log file sequence starts from 1. | ||
| static inline std::atomic<file_sequence_t::value_type> s_file_num = 1; | ||
|
|
||
| // Keep track of the current log file. | ||
| std::unique_ptr<log_file_t> m_current_file; | ||
|
|
||
| std::unique_ptr<async_disk_writer_t> m_async_disk_writer; | ||
| }; | ||
|
|
||
| } // namespace persistence | ||
| } // namespace db | ||
| } // namespace gaia |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me that there's any reason for the chunk_count to be persisted in the txn log itself, rather than in client-side session thread TLS or shared session state (which we need anyway for crash recovery). Do we need the chunk count in the txn log as anything but a consistency check when we extract the set of used chunks from redo offsets during a scan on the server?
In general I'd prefer to avoid storing redundant information like this in persistent structures, unless there's a compelling reason to do so for performance or simplicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Youre correct. i can infer the set of chunks from the txn log and move this to client tls.