-
Notifications
You must be signed in to change notification settings - Fork 6
Introduce log writer APIs (log_io.cpp, log_io.hpp) #1144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
e4d426b
e9407d9
b72466a
0508152
cbdad1d
5fe5efa
f0d039c
baa1012
1c6fd3b
734b736
1b5b83a
8c7e7c5
1a96167
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,12 +37,12 @@ async_disk_writer_t::async_disk_writer_t(int validate_flush_efd, int signal_chec | |
| ASSERT_PRECONDITION(validate_flush_efd >= 0, "Invalid validate flush eventfd"); | ||
| ASSERT_PRECONDITION(signal_checkpoint_efd >= 0, "Invalid signal checkpoint eventfd"); | ||
|
|
||
| m_validate_flush_efd = validate_flush_efd; | ||
| m_signal_checkpoint_efd = signal_checkpoint_efd; | ||
| m_validate_flush_eventfd = validate_flush_efd; | ||
| m_signal_checkpoint_eventfd = signal_checkpoint_efd; | ||
|
|
||
| // Used to block new writes to disk when a batch is already getting flushed. | ||
| s_flush_efd = eventfd(1, 0); | ||
| if (s_flush_efd == -1) | ||
| s_flush_eventfd = eventfd(1, 0); | ||
| if (s_flush_eventfd == -1) | ||
| { | ||
| const char* reason = ::explain_eventfd(1, 0); | ||
| throw_system_error(reason); | ||
|
|
@@ -62,7 +62,7 @@ async_disk_writer_t::~async_disk_writer_t() | |
|
|
||
| void async_disk_writer_t::teardown() | ||
| { | ||
| close_fd(s_flush_efd); | ||
| close_fd(s_flush_eventfd); | ||
| } | ||
|
|
||
| void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t user_data) | ||
|
|
@@ -76,12 +76,7 @@ void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t use | |
| throw_system_error(ss.str(), err); | ||
| } | ||
|
|
||
| void async_disk_writer_t::map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_fd) | ||
| { | ||
| m_ts_to_session_decision_fd_map.insert(std::pair(commit_ts, session_decision_fd)); | ||
| } | ||
|
|
||
| void async_disk_writer_t::add_decisions_to_batch(decision_list_t& decisions) | ||
| void async_disk_writer_t::add_decisions_to_batch(const decision_list_t& decisions) | ||
| { | ||
| for (const auto& decision : decisions) | ||
| { | ||
|
|
@@ -106,7 +101,7 @@ void async_disk_writer_t::perform_post_completion_maintenance() | |
| // Signal to checkpointing thread the upper bound of files that it can process. | ||
| if (max_file_seq_to_close > 0) | ||
| { | ||
| signal_eventfd(m_signal_checkpoint_efd, max_file_seq_to_close); | ||
| signal_eventfd(m_signal_checkpoint_eventfd, max_file_seq_to_close); | ||
| } | ||
|
|
||
| const decision_list_t& decisions = m_in_flight_batch->get_decision_batch_entries(); | ||
|
|
@@ -116,10 +111,10 @@ void async_disk_writer_t::perform_post_completion_maintenance() | |
| transactions::txn_metadata_t::set_txn_durable(decision.commit_ts); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit of a higher-level design digression: generally speaking, if we have dedicated txn metadata flags for a property, we don't need a watermark for it, and vice versa. The key difference is that metadata flags can be updated out-of-order, while a watermark is in-order by construction (it can only represent a "prefix property", i.e. a predicate which is true for every timestamp <= the watermark). If txns were made durable strictly in commit_ts order (or if there were no latency penalty from requiring a full prefix of txns to be durable before advancing the watermark), then I think a durability watermark could replace the |
||
|
|
||
| // Unblock session thread. | ||
| auto itr = m_ts_to_session_decision_fd_map.find(decision.commit_ts); | ||
| ASSERT_INVARIANT(itr != m_ts_to_session_decision_fd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts"); | ||
| auto itr = m_ts_to_session_decision_eventfd_map.find(decision.commit_ts); | ||
| ASSERT_INVARIANT(itr != m_ts_to_session_decision_eventfd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts"); | ||
| signal_eventfd_single_thread(itr->second); | ||
| m_ts_to_session_decision_fd_map.erase(itr); | ||
| m_ts_to_session_decision_eventfd_map.erase(itr); | ||
| } | ||
|
|
||
| m_in_flight_batch->clear_decision_batch(); | ||
|
|
@@ -130,7 +125,7 @@ void async_disk_writer_t::submit_and_swap_in_progress_batch(int file_fd, bool sh | |
| eventfd_t event_counter; | ||
|
|
||
| // Block on any pending disk flushes. | ||
| eventfd_read(s_flush_efd, &event_counter); | ||
| eventfd_read(s_flush_eventfd, &event_counter); | ||
|
|
||
| // Perform any maintenance on the in_flight batch. | ||
| perform_post_completion_maintenance(); | ||
|
|
@@ -147,7 +142,7 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ | |
| swap_batches(); | ||
|
|
||
| // Nothing to submit; reset the flush efd that got burnt in submit_and_swap_in_progress_batch() function. | ||
| signal_eventfd_single_thread(s_flush_efd); | ||
| signal_eventfd_single_thread(s_flush_eventfd); | ||
|
|
||
| // Reset metadata buffer. | ||
| m_metadata_buffer.clear(); | ||
|
|
@@ -158,8 +153,8 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_ | |
| m_in_progress_batch->add_fdatasync_op_to_batch(file_fd, get_enum_value(uring_op_t::fdatasync), IOSQE_IO_LINK); | ||
|
|
||
| // Signal eventfd's as part of batch. | ||
| m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, s_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK); | ||
| m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, m_validate_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN); | ||
| m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, s_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK); | ||
| m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, m_validate_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN); | ||
|
|
||
| swap_batches(); | ||
| auto flushed_batch_size = m_in_flight_batch->get_unsubmitted_entries_count(); | ||
|
|
@@ -187,7 +182,7 @@ void async_disk_writer_t::perform_file_close_operations(int file_fd, file_sequen | |
| m_in_progress_batch->append_file_to_batch(file_fd, log_seq); | ||
| } | ||
|
|
||
| unsigned char* async_disk_writer_t::copy_into_metadata_buffer(void* source, size_t size, int file_fd) | ||
| unsigned char* async_disk_writer_t::copy_into_metadata_buffer(const void* source, size_t size, int file_fd) | ||
| { | ||
| auto current_ptr = m_metadata_buffer.get_current_ptr(); | ||
| ASSERT_PRECONDITION(current_ptr, "Invalid metadata buffer pointer."); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,14 +25,9 @@ namespace persistence | |
| // TODO (Mihir): Use io_uring for fsync, close & fallocate operations in this file. | ||
| // open() operation will remain synchronous, since we need the file fd to perform other async | ||
| // operations on the file. | ||
| log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_seq, size_t size) | ||
| log_file_t::log_file_t(const std::string& dir_name, int dir_fd, file_sequence_t file_seq, size_t file_size) | ||
| : m_file_size(file_size), m_file_seq(file_seq), m_dir_name(dir_name), m_dir_fd(dir_fd) | ||
|
||
| { | ||
| m_dir_fd = dir_fd; | ||
| m_dir_name = dir; | ||
| m_file_seq = file_seq; | ||
| m_file_size = size; | ||
| m_current_offset = 0; | ||
|
|
||
| // open and fallocate depending on size. | ||
| std::stringstream file_name; | ||
| file_name << m_dir_name << "/" << m_file_seq; | ||
|
|
@@ -66,12 +61,12 @@ log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_ | |
| } | ||
| } | ||
|
|
||
| size_t log_file_t::get_current_offset() | ||
| file_offset_t log_file_t::get_current_offset() const | ||
| { | ||
| return m_current_offset; | ||
| } | ||
|
|
||
| int log_file_t::get_file_fd() | ||
| int log_file_t::get_file_fd() const | ||
| { | ||
| return m_file_fd; | ||
| } | ||
|
|
@@ -81,7 +76,12 @@ void log_file_t::allocate(size_t size) | |
| m_current_offset += size; | ||
| } | ||
|
|
||
| size_t log_file_t::get_remaining_bytes_count(size_t record_size) | ||
| file_sequence_t log_file_t::get_file_sequence() const | ||
| { | ||
| return m_file_seq; | ||
| } | ||
|
|
||
| size_t log_file_t::get_bytes_remaining_after_append(size_t record_size) const | ||
| { | ||
| ASSERT_INVARIANT(m_file_size > 0, "Preallocated file size should be greater than 0."); | ||
| if (m_file_size < (m_current_offset + record_size)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we put an
_beforefd? (i.em_validate_flush_event_fd).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no,
eventfdis the name of a kernel object type and a syscall