-
Notifications
You must be signed in to change notification settings - Fork 6
Custom recovery/checkpointing impl #1232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: logwriter_thread
Are you sure you want to change the base?
Changes from 2 commits
6fd85c2
59b5586
aee42d2
98615c6
6a002d6
a0712f4
9913c26
3cb4d95
35fe56f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,6 +87,33 @@ class log_handler_t | |
| */ | ||
| void register_commit_ts_for_session_notification(gaia_txn_id_t commit_ts, int session_decision_eventfd); | ||
|
|
||
| /** | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we should move these comments to the header file, but you can wait until we move the others as well. |
||
| * Entry point to start recovery procedure from gaia log files. Checkpointing reuses the same function. | ||
| */ | ||
| void recover_from_persistent_log( | ||
| gaia_txn_id_t& last_checkpointed_commit_ts, | ||
| uint64_t& last_processed_log_seq, | ||
| uint64_t max_log_seq_to_process, | ||
| recovery_mode_t mode); | ||
|
|
||
| /** | ||
| * Destroy all log files with sequence number lesser than or equal to max_log_seq_to_delete. | ||
| */ | ||
| void destroy_persistent_log(uint64_t max_log_seq_to_delete); | ||
|
||
|
|
||
| /** | ||
| * Register persistent store create/delete APIs. Rework to call persistent store APIs directly? | ||
| */ | ||
| void register_write_to_persistent_store_fn(std::function<void(db_recovered_object_t&)> write_obj_fn); | ||
|
||
| void register_remove_from_persistent_store_fn(std::function<void(gaia::common::gaia_id_t)> remove_obj_fn); | ||
|
|
||
| /** | ||
| * Set the log sequence counter. | ||
| */ | ||
| void set_persistent_log_sequence(uint64_t log_seq); | ||
|
||
|
|
||
| size_t get_remaining_txns_to_checkpoint_count(); | ||
|
|
||
| private: | ||
| // TODO: Make log file size configurable. | ||
| static constexpr uint64_t c_file_size = 4 * 1024 * 1024; | ||
|
|
@@ -102,6 +129,32 @@ class log_handler_t | |
| std::unique_ptr<log_file_t> m_current_file; | ||
|
|
||
| std::unique_ptr<async_disk_writer_t> m_async_disk_writer; | ||
|
|
||
| // Map txn commit_ts to location of log record header during recovery. | ||
| // This index is maintained on a per log file basis. Before moving to the next file | ||
| // we assert that this index is empty as all txns have been processed. | ||
| // Note that the recovery implementation proceeds in increasing log file order. | ||
| std::map<gaia_txn_id_t, unsigned char*> txn_index; | ||
|
||
|
|
||
| // This map contains the current set of txns that are being processed (by either recovery or checkpointing) | ||
|
||
| // Txns are processed one decision record at a time; a single decision record may contain | ||
| // multiple txns. | ||
| std::map<gaia_txn_id_t, decision_type_t> decision_index; | ||
|
||
|
|
||
| gaia_txn_id_t m_max_decided_commit_ts; | ||
|
|
||
| std::function<void(db_recovered_object_t&)> write_to_persistent_store_fn; | ||
|
||
| std::function<void(gaia::common::gaia_id_t)> remove_from_persistent_store_fn; | ||
|
|
||
| // Recovery & Checkpointing APIs | ||
| size_t update_cursor(struct record_iterator_t* it); | ||
|
||
| size_t validate_recovered_record_crc(struct record_iterator_t* it); | ||
|
||
| void map_log_file(struct record_iterator_t* it, int file_fd, recovery_mode_t recovery_mode); | ||
|
||
| void unmap_file(void* start, size_t size); | ||
| bool is_remaining_file_empty(unsigned char* start, unsigned char* end); | ||
| void write_log_record_to_persistent_store(read_record_t* record); | ||
| void write_records(record_iterator_t* it, gaia_txn_id_t& last_checkpointed_commit_ts); | ||
| bool write_log_file_to_persistent_store(gaia_txn_id_t& last_checkpointed_commit_ts, record_iterator_t& it); | ||
|
||
| }; | ||
|
|
||
| } // namespace persistence | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,26 @@ namespace db | |
| namespace persistence | ||
| { | ||
|
|
||
| enum class recovery_mode_t : uint8_t | ||
| { | ||
| not_set = 0x0, | ||
|
|
||
| // Does not tolerate any IO failure when reading a log file; any | ||
| // IO error is treated as unrecoverable. | ||
| // This mode is used when checkpointing log writes to RocksDB. | ||
| checkpoint = 0x1, | ||
|
||
|
|
||
| // Stop recovery on first IO error. Database will always start and will try to recover as much | ||
| // committed data from the log as possible. | ||
| // Updates are logged one batch as a time; Persistent batch IO is validated | ||
| // first before marking any txn in the batch as durable (and returning a commit decision to the user); | ||
| // Thus ignore any txn after the last seen decision timestamp before encountering IO error. | ||
| finish_on_first_error = 0x2, | ||
|
|
||
| // TODO: Already supported by 'checkpoint' option, but make this option visible to customer along with 'finish_on_first_error' | ||
| kill_on_first_error = 0x3, | ||
|
||
| }; | ||
|
|
||
| enum class record_type_t : uint8_t | ||
| { | ||
| not_set = 0x0, | ||
|
|
@@ -109,6 +129,25 @@ struct record_header_t | |
| char padding[3]; | ||
| }; | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re: the |
||
| struct read_record_t | ||
| { | ||
| struct record_header_t header; | ||
| unsigned char payload[]; | ||
| }; | ||
|
|
||
| struct record_iterator_t | ||
| { | ||
| unsigned char* cursor; | ||
|
||
| unsigned char* end; | ||
| unsigned char* stop_at; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean? |
||
| unsigned char* begin; | ||
|
||
| void* mapped; | ||
|
||
| size_t map_size; | ||
| int file_fd; | ||
| recovery_mode_t recovery_mode; | ||
| bool halt_recovery; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Halt recovery on what condition? Or is this just a flag we set when we want to halt recovery? |
||
| }; | ||
|
|
||
| // The primary motivation of this buffer is to keep a hold of any additional information we want to write to the log | ||
|
||
| // apart from the shared memory objects. | ||
| // Custom information includes | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,8 @@ namespace gaia | |
| { | ||
| namespace db | ||
| { | ||
| namespace persistence | ||
| { | ||
|
|
||
| class persistent_store_manager | ||
| { | ||
|
|
@@ -87,11 +89,48 @@ class persistent_store_manager | |
|
|
||
| void reset_log(); | ||
|
|
||
| /** | ||
| * This API is only used during checkpointing & recovery. | ||
| */ | ||
| void put(gaia::db::db_recovered_object_t& object); | ||
|
|
||
| /** | ||
| * This API is only used during checkpointing & recovery. | ||
| */ | ||
| void remove(gaia::common::gaia_id_t id_to_remove); | ||
|
|
||
| /** | ||
| * Flush rocksdb memory buffer to disk as an SST file. | ||
| * The rocksdb memtable is used to hold writes before writing them to an SST file. | ||
| * The alternative is the RocksDB SSTFileWriter. Both options require reserving memory to stage writes in | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we can flush writes in commit_ts order to an SSTFileWriter, then I don't see what advantage the memtable gives us. Are you thinking that we need to use an ordered structure to buffer writes because data may appear in the WAL out of commit_ts order, so we may as well use the memtable? |
||
| * the SST format before writing to disk. | ||
| * Additionally, in RocksDB, each column family has an individual set of | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand, surely we're only using a single column family anyway, so how does using the memtable help us in this respect? |
||
| * SST files and the memtable. Thus, writing to multiple column families will require maintaining multiple SSTFileWriter objects. | ||
| * All of this is handled by the memtable already. | ||
| * Note that the memtable is only written to during log checkpointing. | ||
| */ | ||
| void flush(); | ||
|
|
||
| /** | ||
| * Get value of a custom key. Used to retain a gaia counter across restarts. | ||
| */ | ||
| uint64_t get_value(const std::string& key); | ||
|
||
|
|
||
| /** | ||
| * Update custom key's value. Used to retain gaia counter across restarts. | ||
|
||
| */ | ||
| void update_value(const std::string& key, uint64_t value_to_write); | ||
|
||
|
|
||
| static constexpr char c_data_dir_command_flag[] = "--data-dir"; | ||
| static constexpr char c_persistent_store_dir_name[] = "/data"; | ||
|
||
| static const std::string c_last_processed_log_num_key; | ||
|
||
|
|
||
| private: | ||
| gaia::db::counters_t* m_counters = nullptr; | ||
| std::unique_ptr<gaia::db::rdb_wrapper_t> m_rdb_wrapper; | ||
| std::unique_ptr<gaia::db::persistence::rdb_wrapper_t> m_rdb_wrapper; | ||
| std::string m_data_dir_path; | ||
| }; | ||
|
|
||
| } // namespace persistence | ||
| } // namespace db | ||
| } // namespace gaia | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,8 @@ namespace gaia | |
| { | ||
| namespace db | ||
| { | ||
|
|
||
| namespace persistence | ||
| { | ||
| class rdb_wrapper_t | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leave an empty line after the set of namespace declarations. |
||
| { | ||
| public: | ||
|
|
@@ -52,12 +53,21 @@ class rdb_wrapper_t | |
|
|
||
| void handle_rdb_error(rocksdb::Status status); | ||
|
|
||
| void flush(); | ||
|
|
||
| void put(const rocksdb::Slice& key, const rocksdb::Slice& value); | ||
|
|
||
| void remove(const rocksdb::Slice& key); | ||
|
|
||
| void get(const rocksdb::Slice& key, std::string& value); | ||
|
|
||
| private: | ||
| std::unique_ptr<rocksdb::TransactionDB> m_txn_db; | ||
| std::string m_data_dir; | ||
| rocksdb::WriteOptions m_write_options; | ||
|
||
| rocksdb::TransactionDBOptions m_txn_options; | ||
| }; | ||
|
|
||
| } // namespace persistence | ||
| } // namespace db | ||
| } // namespace gaia | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -270,6 +270,41 @@ void server_t::recover_persistent_log() | |
| s_log_handler = std::make_unique<persistence::log_handler_t>(s_server_conf.data_dir()); | ||
|
|
||
| s_log_handler->open_for_writes(s_validate_persistence_batch_eventfd, s_signal_checkpoint_log_eventfd); | ||
|
|
||
| auto put_obj = [&](db_recovered_object_t& obj) { | ||
|
||
| s_persistent_store->put(obj); | ||
| }; | ||
| auto remove_obj = [=](gaia_id_t id) { | ||
| s_persistent_store->remove(id); | ||
| }; | ||
|
|
||
| s_log_handler->register_write_to_persistent_store_fn(put_obj); | ||
| s_log_handler->register_remove_from_persistent_store_fn(remove_obj); | ||
|
|
||
| if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup) | ||
| { | ||
| s_log_handler->destroy_persistent_log(INT64_MAX); | ||
|
||
| } | ||
|
|
||
| // Get last processed log. | ||
| auto last_processed_log_seq = s_persistent_store->get_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key); | ||
|
||
|
|
||
| // Recover only the first time this method gets called. | ||
| gaia_txn_id_t last_checkpointed_commit_ts = 0; | ||
| s_log_handler->recover_from_persistent_log( | ||
| last_checkpointed_commit_ts, | ||
| last_processed_log_seq, | ||
| INT64_MAX, | ||
|
||
| gaia::db::persistence::recovery_mode_t::finish_on_first_error); | ||
|
|
||
| s_persistent_store->update_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key, last_processed_log_seq); | ||
|
|
||
| s_log_handler->set_persistent_log_sequence(last_processed_log_seq); | ||
|
|
||
| s_log_handler->destroy_persistent_log(INT64_MAX); | ||
|
|
||
| std::cout << "s_validate_persistence_batch_eventfd = " << s_validate_persistence_batch_eventfd << std::endl; | ||
| s_log_handler->open_for_writes(s_validate_persistence_batch_eventfd, s_signal_checkpoint_log_eventfd); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -899,14 +934,19 @@ void server_t::recover_db() | |
| auto cleanup = make_scope_guard([]() { end_startup_txn(); }); | ||
| begin_startup_txn(); | ||
|
|
||
| s_persistent_store = std::make_unique<gaia::db::persistent_store_manager>( | ||
| s_persistent_store = std::make_unique<gaia::db::persistence::persistent_store_manager>( | ||
| get_counters(), s_server_conf.data_dir()); | ||
| if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup) | ||
| { | ||
| s_persistent_store->destroy_persistent_store(); | ||
| } | ||
| s_persistent_store->open(); | ||
|
|
||
| recover_persistent_log(); | ||
|
|
||
| // Flush persistent store buffer to disk. | ||
| s_persistent_store->flush(); | ||
|
|
||
| s_persistent_store->recover(); | ||
| } | ||
| } | ||
|
|
@@ -1043,6 +1083,36 @@ void server_t::log_writer_handler() | |
| } | ||
| } | ||
|
|
||
| void server_t::checkpoint_handler() | ||
| { | ||
| // Wait for a persistent log file to be closed before checkpointing it. | ||
|
||
| // This can be achieved via blocking on an eventfd read. | ||
| uint64_t last_deleted_log_seq = 0; | ||
| while (true) | ||
| { | ||
| // Log sequence number of file ready to be checkpointed. | ||
| eventfd_t max_log_seq_to_checkpoint; | ||
| eventfd_read(s_signal_checkpoint_log_eventfd, &max_log_seq_to_checkpoint); | ||
|
|
||
| // Process all existing log files. | ||
| uint64_t last_processed_log_seq = 0; | ||
|
||
| s_log_handler->recover_from_persistent_log( | ||
| s_last_checkpointed_commit_ts_lower_bound, | ||
| last_processed_log_seq, | ||
| max_log_seq_to_checkpoint, | ||
| gaia::db::persistence::recovery_mode_t::checkpoint); | ||
|
|
||
| s_persistent_store->update_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key, last_processed_log_seq); | ||
|
|
||
| // Flush persistent store buffer to disk. | ||
| s_persistent_store->flush(); | ||
|
|
||
| ASSERT_INVARIANT(max_log_seq_to_checkpoint > last_deleted_log_seq, "Log files cannot be deleted out of order"); | ||
| s_log_handler->destroy_persistent_log(last_processed_log_seq); | ||
| last_deleted_log_seq = max_log_seq_to_checkpoint; | ||
| } | ||
| } | ||
|
|
||
| gaia_txn_id_t server_t::begin_startup_txn() | ||
| { | ||
| // Reserve an index in the safe_ts array, so the main thread can execute | ||
|
|
@@ -3521,10 +3591,13 @@ void server_t::run(server_config_t server_conf) | |
|
|
||
| s_signal_decision_eventfd = make_nonblocking_eventfd(); | ||
|
|
||
| s_signal_checkpoint_log_eventfd = make_blocking_eventfd(); | ||
|
|
||
| auto cleanup_persistence_eventfds = make_scope_guard([]() { | ||
| close_fd(s_signal_log_write_eventfd); | ||
| close_fd(s_signal_decision_eventfd); | ||
| close_fd(s_validate_persistence_batch_eventfd); | ||
| close_fd(s_signal_checkpoint_log_eventfd); | ||
| }); | ||
|
|
||
| // Block handled signals in this thread and subsequently spawned threads. | ||
|
|
@@ -3537,10 +3610,12 @@ void server_t::run(server_config_t server_conf) | |
| init_shared_memory(); | ||
|
|
||
| std::thread log_writer_thread; | ||
| std::thread checkpoint_thread; | ||
| if (s_server_conf.persistence_mode() != persistence_mode_t::e_disabled) | ||
| { | ||
| // Launch persistence thread. | ||
| log_writer_thread = std::thread(&log_writer_handler); | ||
| checkpoint_thread = std::thread(&checkpoint_handler); | ||
| } | ||
|
|
||
| // Launch thread to listen for client connections and create session threads. | ||
|
|
@@ -3559,6 +3634,7 @@ void server_t::run(server_config_t server_conf) | |
| if (s_server_conf.persistence_mode() != persistence_mode_t::e_disabled) | ||
| { | ||
| log_writer_thread.join(); | ||
| checkpoint_thread.join(); | ||
| } | ||
|
|
||
| // We special-case SIGHUP to force reinitialization of the server. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't this atomic like
s_last_queued_commit_ts_upper_bound?