Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions production/db/core/inc/async_disk_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class async_disk_writer_t
* Append fdatasync to the in_progress_batch and update batch with file fd so that the file
* can be closed once the kernel has processed it.
*/
void perform_file_close_operations(int file_fd, file_sequence_t log_seq);
void perform_file_close_operations(int file_fd, log_sequence_t log_seq);

/**
* Copy any temporary writes (which don't exist in gaia shared memory) into the metadata buffer.
Expand All @@ -115,22 +115,24 @@ class async_disk_writer_t
void map_commit_ts_to_session_decision_eventfd(gaia_txn_id_t commit_ts, int session_decision_eventfd);

private:
// Reserve slots in the in_progress batch to be able to append additional operations to it (before it gets submitted to the kernel)
// Reserve slots in the in_progress batch to be able to append additional operations to it
// (before it gets submitted to the kernel)
static constexpr size_t c_submit_batch_sqe_count = 3;
static constexpr size_t c_single_submission_entry_count = 1;
static constexpr size_t c_async_batch_size = 32;
static constexpr size_t c_max_iovec_array_size_bytes = IOV_MAX * sizeof(iovec);
static inline eventfd_t c_default_flush_eventfd_value = 1;
static inline iovec c_default_iov = {static_cast<void*>(&c_default_flush_eventfd_value), sizeof(eventfd_t)};

// eventfd to signal that a batch flush has completed.
// Used to block new writes to disk when a batch is already getting flushed.
// eventfd for signalling that a batch flush has completed. Used to block new writes to disk
// when a batch is already getting flushed.
static inline int s_flush_eventfd = -1;

// eventfd to signal that the IO results belonging to a batch are ready to be validated.
// eventfd for signalling that the IO results belonging to a batch are ready to be checked for
// errors.
int m_validate_flush_eventfd = -1;

// eventfd to signal that a file is ready to be checkpointed.
// eventfd for signal that a log file is ready to be checkpointed.
int m_signal_checkpoint_eventfd = -1;

// Keep track of session threads to unblock.
Expand Down
4 changes: 2 additions & 2 deletions production/db/core/inc/async_write_batch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ class async_write_batch_t
/**
* Add file fd to the batch that should be closed once all of its pending writes have finished.
*/
void append_file_to_batch(int fd, file_sequence_t log_seq);
void append_file_to_batch(int fd, log_sequence_t log_seq);

file_sequence_t get_max_file_seq_to_close();
log_sequence_t get_max_log_seq_to_close();

/**
* https://man7.org/linux/man-pages/man2/pwritev.2.html
Expand Down
8 changes: 6 additions & 2 deletions production/db/core/inc/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ class server_t

static inline mapped_data_t<locators_t> s_shared_locators{};
static inline mapped_data_t<counters_t> s_shared_counters{};
static inline mapped_data_t<data_t> s_shared_data{};
static inline<datamapped_data_t_t> s_shared_data{};
static inline mapped_data_t<id_index_t> s_shared_id_index{};
static inline index::indexes_t s_global_indexes{};
static inline std::unique_ptr<persistent_store_manager> s_persistent_store{};
static inline std::shared_ptr<persistence::persistent_store_manager_t> s_persistent_store{};
static inline std::unique_ptr<persistence::log_handler_t> s_log_handler{};

// These fields have transaction lifetime.
Expand Down Expand Up @@ -267,6 +267,8 @@ class server_t
// Keep track of the last txn that has been submitted to the async_disk_writer.
static inline std::atomic<gaia_txn_id_t> s_last_queued_commit_ts_upper_bound = c_invalid_gaia_txn_id;

static inline std::atomic<gaia_txn_id_t> s_last_checkpointed_commit_ts_lower_bound = c_invalid_gaia_txn_id;

// Keep a track of undecided txns submitted to the async_disk_writer.
static inline std::set<gaia_txn_id_t> s_seen_and_undecided_txn_set{};

Expand Down Expand Up @@ -443,6 +445,8 @@ class server_t

static void recover_persistent_log();

static void checkpoint_handler();

static void flush_all_pending_writes();

static void session_handler(int session_socket);
Expand Down
10 changes: 7 additions & 3 deletions production/db/core/inc/log_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace persistence
class log_file_t
{
public:
log_file_t(const std::string& dir_name, int dir_fd, file_sequence_t file_seq, size_t file_size);
log_file_t(const std::string& dir_name, int dir_fd, log_sequence_t log_seq, size_t file_size);

/**
* Obtain offset to write the next log record at.
Expand All @@ -51,17 +51,21 @@ class log_file_t
/**
* Obtain sequence number for the file.
*/
file_sequence_t get_file_sequence();
log_sequence_t get_log_sequence();

private:
size_t m_file_size;
file_sequence_t m_file_seq;
log_sequence_t m_log_seq;
file_offset_t m_current_offset;
std::string m_dir_name;
int m_dir_fd;
int m_file_fd;
std::string m_file_name;
inline static constexpr int c_file_permissions = 0666;

// We reserve space for a header at the end of a log file.
// This serves to signify the end of a file.
inline static constexpr size_t c_reserved_size = sizeof(record_header_t);
};

} // namespace persistence
Expand Down
65 changes: 62 additions & 3 deletions production/db/core/inc/log_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class log_handler_t
*/
void create_decision_record(const decision_list_t& txn_decisions);

/**
* Log record to signify end of file.
*/
void create_end_of_file_record();

/**
* Submit async_disk_writer's internal I/O request queue to the kernel for processing.
*/
Expand All @@ -87,21 +92,75 @@ class log_handler_t
*/
void register_commit_ts_for_session_notification(gaia_txn_id_t commit_ts, int session_decision_eventfd);

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should move these comments to the header file, but you can wait until we move the others as well.

* Entry point to start recovery procedure from gaia log files. Checkpointing reuses the same function.
*/
void recover_from_persistent_log(
std::shared_ptr<persistent_store_manager_t>& persistent_store_manager,
gaia_txn_id_t& last_checkpointed_commit_ts,
log_sequence_t& last_processed_log_seq,
log_sequence_t max_log_seq_to_process,
log_reader_mode_t mode);

/**
* Destroy all log files with sequence number lesser than or equal to max_log_seq_to_delete.
*/
void truncate_persistent_log(log_sequence_t max_log_seq_to_delete);

/**
* Set the log sequence counter.
*/
void set_persistent_log_sequence(log_sequence_t log_seq);

private:
// TODO: Make log file size configurable.
static constexpr uint64_t c_file_size = 4 * 1024 * 1024;
static constexpr std::string_view c_gaia_wal_dir_name = "/wal_dir";
static constexpr const char c_gaia_wal_dir_name[] = "wal_dir";
static constexpr int c_gaia_wal_dir_permissions = 0755;
static inline std::string s_wal_dir_path{};
static inline std::filesystem::path s_wal_dir_path{};
static inline int s_dir_fd = -1;

// Log file sequence starts from 1.
static inline std::atomic<file_sequence_t::value_type> s_file_num = 1;
static inline std::atomic<log_sequence_t::value_type> s_file_num = 1;

// Keep track of the current log file.
std::unique_ptr<log_file_t> m_current_file;

std::unique_ptr<async_disk_writer_t> m_async_disk_writer;

// Recovery & Checkpointing APIs/structures.

// This map is populated when log files are read during recovery/checkpointing.
// Map txn commit_ts to location of log record header during recovery.
// This index is maintained on a per log file basis. Before moving to the next file
// we assert that this index is empty as all txns have been processed.
// Note that the recovery implementation proceeds in increasing log file order.
std::map<gaia_txn_id_t, unsigned char*> m_txn_records_by_commit_ts;

// This map is populated when log files are read during recovery/checkpointing.
// This map contains the current set of txns that are being processed. (by either recovery or checkpointing)
// Txns are processed one decision record at a time; a single decision record may contain
// multiple txns.
std::map<gaia_txn_id_t, decision_type_t> m_decision_records_by_commit_ts;

gaia_txn_id_t m_max_decided_commit_ts;

size_t update_iterator(struct record_iterator_t* it);
void validate_checksum(struct record_iterator_t* it);
void map_log_file(struct record_iterator_t* it, int file_fd, log_reader_mode_t recovery_mode);
void unmap_file(void* start, size_t size);
bool is_remaining_file_empty(unsigned char* start, unsigned char* end);
void write_log_record_to_persistent_store(
std::shared_ptr<persistent_store_manager_t>& persistent_store_manager,
read_record_t* record);
void write_records(
std::shared_ptr<persistent_store_manager_t>& persistent_store_manager,
record_iterator_t* it,
gaia_txn_id_t* last_checkpointed_commit_ts);
bool write_log_file_to_persistent_store(
std::shared_ptr<persistent_store_manager_t>& persistent_store_manager,
record_iterator_t* it,
gaia_txn_id_t* last_checkpointed_commit_ts);
};

} // namespace persistence
Expand Down
129 changes: 117 additions & 12 deletions production/db/core/inc/persistence_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cstddef>

#include <atomic>
#include <limits>
#include <ostream>

#include "gaia/common.hpp"
Expand All @@ -23,11 +24,30 @@ namespace db
namespace persistence
{

enum class log_reader_mode_t : uint8_t
{
not_set = 0x0,

// Checkpoint mode.
// Does not tolerate any IO failure when reading a log file; any
// IO error is treated as unrecoverable.
checkpoint_fail_on_first_error = 0x1,

// Recovery mode.
// Stop recovery on first IO error. Database will always start and will try to recover as much
// committed data from the log as possible.
// Updates are logged one batch as a time; Persistent batch IO is validated
// first before marking any txn in the batch as durable (and returning a commit decision to the user);
// Thus ignore any txn after the last seen decision timestamp before encountering IO error.
recovery_stop_on_first_error = 0x2,
};

enum class record_type_t : uint8_t
{
not_set = 0x0,
txn = 0x1,
decision = 0x2,
end_of_file = 0x3,
};

enum class decision_type_t : uint8_t
Expand All @@ -46,26 +66,27 @@ struct decision_entry_t
// Pair of log file sequence number and file fd.
typedef std::vector<decision_entry_t> decision_list_t;

class file_sequence_t : public common::int_type_t<size_t, 0>
// Persistent log file sequence number.
class log_sequence_t : public common::int_type_t<size_t, 0>
{
public:
// By default, we should initialize to an invalid value.
constexpr file_sequence_t()
constexpr log_sequence_t()
: common::int_type_t<size_t, 0>()
{
}

constexpr file_sequence_t(size_t value)
constexpr log_sequence_t(size_t value)
: common::int_type_t<size_t, 0>(value)
{
}
};

static_assert(
sizeof(file_sequence_t) == sizeof(file_sequence_t::value_type),
"file_sequence_t has a different size than its underlying integer type!");
sizeof(log_sequence_t) == sizeof(log_sequence_t::value_type),
"log_sequence_t has a different size than its underlying integer type!");

constexpr file_sequence_t c_invalid_file_sequence_number;
constexpr log_sequence_t c_invalid_log_sequence_number;

typedef size_t file_offset_t;

Expand All @@ -81,19 +102,25 @@ typedef uint32_t crc32_t;
// This assertion ensures that the default type initialization
// matches the value of the invalid constant.
static_assert(
c_invalid_file_sequence_number.value() == file_sequence_t::c_default_invalid_value,
"Invalid c_invalid_file_sequence_number initialization!");
c_invalid_log_sequence_number.value() == log_sequence_t::c_default_invalid_value,
"Invalid c_invalid_log_sequence_number initialization!");

struct log_file_info_t
{
file_sequence_t sequence;
log_sequence_t sequence;
int file_fd;
};

struct log_file_pointer_t
{
void* begin;
size_t size;
};

struct record_header_t
{
record_size_t record_size;
crc32_t crc;
record_size_t payload_size;
record_type_t record_type;
gaia_txn_id_t txn_commit_ts;

Expand All @@ -109,8 +136,86 @@ struct record_header_t
char padding[3];
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: the record_header_t definition: if you insert padding fields by hand then you should verify your assumptions on those padding fields via static_assert().

// The primary motivation of this buffer is to keep a hold of any additional information we want to write to the log
// apart from the shared memory objects.
constexpr size_t c_invalid_read_record_size = 0;

struct read_record_t
{
struct record_header_t header;
unsigned char payload[];

// Record size includes the header size and the payload size.
size_t get_record_size()
{
return header.record_size;
}

unsigned char* get_record()
{
return reinterpret_cast<unsigned char*>(this);
}

unsigned char* get_deleted_ids()
{
ASSERT_PRECONDITION(header.record_type == record_header_t::record_type::txn, "Incorrect record type!");
return reinterpret_cast<unsigned char*>(payload);
}

unsigned char* get_objects()
{
ASSERT_PRECONDITION(header.record_type == record_header_t::record_type::txn, "Incorrect record type!");
return get_deleted_ids() + header.deleted_object_count * sizeof(gaia_id_t);
}

decision_entry_t* get_decisions()
{
ASSERT_PRECONDITION(header.record_type == record_header_t::record_type::decision, "Incorrect record type!");
return reinterpret_cast<decision_entry_t*>(payload);
}

unsigned char* get_payload_end()
{
return get_record() + get_record_size();
}

static read_record_t* get_record(void* ptr)
{
ASSERT_PRECONDITION(ptr, "Invalid address!");
return reinterpret_cast<read_record_t*>(ptr);
}

bool is_valid()
{
return header.record_size != c_invalid_read_record_size && (header.record_type == record_type_t::txn || header.record_type == record_type_t::decision || header.record_type == record_type_t::end_of_file);
}
};

struct record_iterator_t
{
// Pointer to the current record in a log file.
unsigned char* iterator;

// Beginning of the log file.
unsigned char* begin;

// End of log file.
unsigned char* end;

// End of log file. May not be the same as end.
unsigned char* stop_at;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?


// Value returned from the mmap() call on a persistent log file.
void* mapped_data;
size_t map_size;
int file_fd;

// Recovery mode.
log_reader_mode_t recovery_mode;

//This flag is set when halt recovery is halted.
bool halt_recovery;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Halt recovery on what condition? Or is this just a flag we set when we want to halt recovery?

};

// This buffer is used to stage non-object data to be written to the log.
// Custom information includes
// 1) deleted object IDs in a txn.
// 2) custom txn headers
Expand Down
Loading