Skip to content

Commit c1a4d78

Browse files
Introduce log writer APIs (log_io.cpp, log_io.hpp) (#1144)
* Introduce log writer APIs (log_io.cpp, log_io.hpp) * fixes * Introduce log writer APIs (log_io.cpp, log_io.hpp) * fixes * fixes 2 * Rewrite chunk/offset extraction logic * Misc. fixes Co-authored-by: Tobin Baker <[email protected]>
1 parent 04dda54 commit c1a4d78

File tree

11 files changed

+650
-74
lines changed

11 files changed

+650
-74
lines changed

production/db/core/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,10 @@ else()
125125
endif()
126126

127127
add_library(gaia_db_persistence STATIC
128-
src/log_file.cpp
129128
src/async_disk_writer.cpp
130-
src/async_write_batch.cpp)
129+
src/async_write_batch.cpp
130+
src/log_file.cpp
131+
src/log_io.cpp)
131132
# Add GAIA_DB_SERVER preprocessor definition for conditional includes.
132133
target_compile_definitions(gaia_db_persistence PUBLIC GAIA_DB_SERVER=1)
133134
configure_gaia_target(gaia_db_persistence)

production/db/core/src/async_disk_writer.cpp

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ async_disk_writer_t::async_disk_writer_t(int validate_flush_efd, int signal_chec
3737
ASSERT_PRECONDITION(validate_flush_efd >= 0, "Invalid validate flush eventfd");
3838
ASSERT_PRECONDITION(signal_checkpoint_efd >= 0, "Invalid signal checkpoint eventfd");
3939

40-
m_validate_flush_efd = validate_flush_efd;
41-
m_signal_checkpoint_efd = signal_checkpoint_efd;
40+
m_validate_flush_eventfd = validate_flush_efd;
41+
m_signal_checkpoint_eventfd = signal_checkpoint_efd;
4242

4343
// Used to block new writes to disk when a batch is already getting flushed.
44-
s_flush_efd = eventfd(1, 0);
45-
if (s_flush_efd == -1)
44+
s_flush_eventfd = eventfd(1, 0);
45+
if (s_flush_eventfd == -1)
4646
{
4747
const char* reason = ::explain_eventfd(1, 0);
4848
throw_system_error(reason);
@@ -62,7 +62,7 @@ async_disk_writer_t::~async_disk_writer_t()
6262

6363
void async_disk_writer_t::teardown()
6464
{
65-
close_fd(s_flush_efd);
65+
close_fd(s_flush_eventfd);
6666
}
6767

6868
void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t user_data)
@@ -76,12 +76,7 @@ void async_disk_writer_t::throw_error(std::string err_msg, int err, uint64_t use
7676
throw_system_error(ss.str(), err);
7777
}
7878

79-
void async_disk_writer_t::map_commit_ts_to_session_decision_efd(gaia_txn_id_t commit_ts, int session_decision_fd)
80-
{
81-
m_ts_to_session_decision_fd_map.insert(std::pair(commit_ts, session_decision_fd));
82-
}
83-
84-
void async_disk_writer_t::add_decisions_to_batch(decision_list_t& decisions)
79+
void async_disk_writer_t::add_decisions_to_batch(const decision_list_t& decisions)
8580
{
8681
for (const auto& decision : decisions)
8782
{
@@ -106,7 +101,7 @@ void async_disk_writer_t::perform_post_completion_maintenance()
106101
// Signal to checkpointing thread the upper bound of files that it can process.
107102
if (max_file_seq_to_close > 0)
108103
{
109-
signal_eventfd(m_signal_checkpoint_efd, max_file_seq_to_close);
104+
signal_eventfd(m_signal_checkpoint_eventfd, max_file_seq_to_close);
110105
}
111106

112107
const decision_list_t& decisions = m_in_flight_batch->get_decision_batch_entries();
@@ -116,10 +111,10 @@ void async_disk_writer_t::perform_post_completion_maintenance()
116111
transactions::txn_metadata_t::set_txn_durable(decision.commit_ts);
117112

118113
// Unblock session thread.
119-
auto itr = m_ts_to_session_decision_fd_map.find(decision.commit_ts);
120-
ASSERT_INVARIANT(itr != m_ts_to_session_decision_fd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts");
114+
auto itr = m_ts_to_session_decision_eventfd_map.find(decision.commit_ts);
115+
ASSERT_INVARIANT(itr != m_ts_to_session_decision_eventfd_map.end(), "Unable to find session durability eventfd from committing txn's commit_ts");
121116
signal_eventfd_single_thread(itr->second);
122-
m_ts_to_session_decision_fd_map.erase(itr);
117+
m_ts_to_session_decision_eventfd_map.erase(itr);
123118
}
124119

125120
m_in_flight_batch->clear_decision_batch();
@@ -130,7 +125,7 @@ void async_disk_writer_t::submit_and_swap_in_progress_batch(int file_fd, bool sh
130125
eventfd_t event_counter;
131126

132127
// Block on any pending disk flushes.
133-
eventfd_read(s_flush_efd, &event_counter);
128+
eventfd_read(s_flush_eventfd, &event_counter);
134129

135130
// Perform any maintenance on the in_flight batch.
136131
perform_post_completion_maintenance();
@@ -147,7 +142,7 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_
147142
swap_batches();
148143

149144
// Nothing to submit; reset the flush efd that got burnt in submit_and_swap_in_progress_batch() function.
150-
signal_eventfd_single_thread(s_flush_efd);
145+
signal_eventfd_single_thread(s_flush_eventfd);
151146

152147
// Reset metadata buffer.
153148
m_metadata_buffer.clear();
@@ -158,8 +153,8 @@ void async_disk_writer_t::finish_and_submit_batch(int file_fd, bool should_wait_
158153
m_in_progress_batch->add_fdatasync_op_to_batch(file_fd, get_enum_value(uring_op_t::fdatasync), IOSQE_IO_LINK);
159154

160155
// Signal eventfd's as part of batch.
161-
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, s_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK);
162-
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, m_validate_flush_efd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN);
156+
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, s_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_flush), IOSQE_IO_LINK);
157+
m_in_progress_batch->add_pwritev_op_to_batch(static_cast<void*>(&c_default_iov), 1, m_validate_flush_eventfd, 0, get_enum_value(uring_op_t::pwritev_eventfd_validate), IOSQE_IO_DRAIN);
163158

164159
swap_batches();
165160
auto flushed_batch_size = m_in_flight_batch->get_unsubmitted_entries_count();
@@ -187,7 +182,7 @@ void async_disk_writer_t::perform_file_close_operations(int file_fd, file_sequen
187182
m_in_progress_batch->append_file_to_batch(file_fd, log_seq);
188183
}
189184

190-
unsigned char* async_disk_writer_t::copy_into_metadata_buffer(void* source, size_t size, int file_fd)
185+
unsigned char* async_disk_writer_t::copy_into_metadata_buffer(const void* source, size_t size, int file_fd)
191186
{
192187
auto current_ptr = m_metadata_buffer.get_current_ptr();
193188
ASSERT_PRECONDITION(current_ptr, "Invalid metadata buffer pointer.");

production/db/core/src/chunk_manager.cpp

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,7 @@ gaia_offset_t chunk_manager_t::allocate(
9393
get_state() == chunk_state_t::in_use,
9494
"Objects can only be allocated from a chunk in the IN_USE state!");
9595

96-
ASSERT_PRECONDITION(allocation_size_in_bytes > 0, "Requested allocation size cannot be 0!");
97-
98-
// Check before converting to slot units to avoid overflow.
99-
ASSERT_PRECONDITION(
100-
allocation_size_in_bytes <= (c_max_allocation_size_in_slots * c_slot_size_in_bytes),
101-
"Requested allocation size exceeds maximum allocation size of 64KB!");
102-
103-
// Calculate allocation size in slot units.
104-
#ifdef DEBUG
105-
// Round up allocation to a page so we can mprotect() it.
106-
size_t allocation_size_in_pages = (allocation_size_in_bytes + c_page_size_in_bytes - 1) / c_page_size_in_bytes;
107-
size_t allocation_size_in_slots = allocation_size_in_pages * (c_page_size_in_bytes / c_slot_size_in_bytes);
108-
#else
109-
size_t allocation_size_in_slots = (allocation_size_in_bytes + c_slot_size_in_bytes - 1) / c_slot_size_in_bytes;
110-
#endif
96+
size_t allocation_size_in_slots = calculate_allocation_size_in_slots(allocation_size_in_bytes);
11197

11298
// Ensure that the new allocation doesn't overflow the chunk.
11399
if (m_metadata->min_unallocated_slot_offset() + allocation_size_in_slots > c_last_slot_offset)

production/db/core/src/log_file.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,9 @@ namespace persistence
2525
// TODO (Mihir): Use io_uring for fsync, close & fallocate operations in this file.
2626
// open() operation will remain synchronous, since we need the file fd to perform other async
2727
// operations on the file.
28-
log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_seq, size_t size)
28+
log_file_t::log_file_t(const std::string& dir_name, int dir_fd, file_sequence_t file_seq, size_t file_size)
29+
: m_dir_name(dir_name), m_dir_fd(dir_fd), m_file_seq(file_seq), m_file_size(file_size)
2930
{
30-
m_dir_fd = dir_fd;
31-
m_dir_name = dir;
32-
m_file_seq = file_seq;
33-
m_file_size = size;
34-
m_current_offset = 0;
35-
3631
// open and fallocate depending on size.
3732
std::stringstream file_name;
3833
file_name << m_dir_name << "/" << m_file_seq;
@@ -66,12 +61,12 @@ log_file_t::log_file_t(const std::string& dir, int dir_fd, file_sequence_t file_
6661
}
6762
}
6863

69-
size_t log_file_t::get_current_offset()
64+
file_offset_t log_file_t::get_current_offset() const
7065
{
7166
return m_current_offset;
7267
}
7368

74-
int log_file_t::get_file_fd()
69+
int log_file_t::get_file_fd() const
7570
{
7671
return m_file_fd;
7772
}
@@ -81,7 +76,12 @@ void log_file_t::allocate(size_t size)
8176
m_current_offset += size;
8277
}
8378

84-
size_t log_file_t::get_remaining_bytes_count(size_t record_size)
79+
file_sequence_t log_file_t::get_file_sequence() const
80+
{
81+
return m_file_seq;
82+
}
83+
84+
size_t log_file_t::get_bytes_remaining_after_append(size_t record_size) const
8585
{
8686
ASSERT_INVARIANT(m_file_size > 0, "Preallocated file size should be greater than 0.");
8787
if (m_file_size < (m_current_offset + record_size))

0 commit comments

Comments
 (0)