Skip to content

Commit

Permalink
Enable add/remove CPU's from epoch consensus (microsoft#3771)
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Jowett <[email protected]>
Co-authored-by: Alan Jowett <[email protected]>
  • Loading branch information
Alan-Jowett and Alan Jowett authored Sep 11, 2024
1 parent 6e8f47a commit a0b093c
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 34 deletions.
222 changes: 195 additions & 27 deletions libs/runtime/ebpf_epoch.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,23 @@
* 1) Each CPU determines the minimum epoch of all threads on the CPU.
* 2) The minimum epoch is committed as the release epoch and any memory that is older than the release epoch is
* released.
* 3) The epoch_computation_in_progress flag is cleared which allows the epoch computation to be initiated again.
* 3) The epoch_computation_in_progress flag is cleared which allows the epoch computation to be initiated again.
*
* Note:
* CPUs can be in one of three states:
* 1) Inactive: The CPU is not actively participating in epoch computation.
* Active flag is false.
* 2) Activating: The CPU is in the process of activating and is not yet active.
* Active flag is true and current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH.
* 3) Active: The CPU is actively participating in epoch computation.
* Active flag is true and current_epoch != EBPF_EPOCH_UNKNOWN_EPOCH.
*
* All CPUs except CPU 0 are in the inactive state at initialization. CPU 0 is always active.
*
* CPUs transition between states as follows:
* 1) Inactive -> Activating: The CPU is activated when a thread enters an epoch and the CPU is not active.
* 2) Activating -> Active: The CPU is active when it is notified of the current epoch value.
* 3) Active -> Inactive: The CPU is deactivated when there are no threads in the epoch and the free list is empty.
*/

/**
Expand All @@ -30,6 +46,16 @@
*/
#define EBPF_NANO_SECONDS_PER_FILETIME_TICK 100

/**
* @brief A sentinel value used to indicate that the epoch is unknown.
*/
#define EBPF_EPOCH_UNKNOWN_EPOCH 0

/**
* @brief The first valid epoch value.
*/
#define EBPF_EPOCH_FIRST_EPOCH 1

#define EBPF_EPOCH_FAIL_FAST(REASON, ASSERTION) \
if (!(ASSERTION)) { \
ebpf_assert(!#ASSERTION); \
Expand All @@ -51,9 +77,19 @@ typedef __declspec(align(EBPF_CACHE_LINE_SIZE)) struct _ebpf_epoch_cpu_entry
int timer_armed : 1; ///< Set if the flush timer is armed.
int rundown_in_progress : 1; ///< Set if rundown is in progress.
int epoch_computation_in_progress : 1; ///< Set if epoch computation is in progress.
ebpf_timed_work_queue_t* work_queue; ///< Work queue used to schedule work items.
int active : 1; ///< CPU is active in epoch computation. Only accessed under _ebpf_epoch_cpu_active_lock.
int work_queue_assigned : 1; ///< Work queue is assigned to this CPU.
ebpf_timed_work_queue_t* work_queue; ///< Work queue used to schedule work items.
} ebpf_epoch_cpu_entry_t;

static_assert(
sizeof(ebpf_epoch_cpu_entry_t) % EBPF_CACHE_LINE_SIZE == 0, "ebpf_epoch_cpu_entry_t is not cache aligned");

/**
* @brief Lock to ensure a consistent view of the active CPUs.
*/
static ebpf_lock_t _ebpf_epoch_cpu_active_lock; ///< Lock to protect the active CPU list.

/**
* @brief Table of per-CPU state.
*/
Expand Down Expand Up @@ -116,12 +152,12 @@ typedef struct _ebpf_epoch_cpu_message
{
struct
{
uint64_t current_epoch; ///< The new current epoch.
uint64_t proposed_release_epoch; ///< Minimum epoch of all threads on the CPU.
int64_t current_epoch; ///< The new current epoch.
int64_t proposed_release_epoch; ///< Minimum epoch of all threads on the CPU.
} propose_epoch;
struct
{
uint64_t released_epoch; ///< The newest epoch that can be released.
int64_t released_epoch; ///< The newest epoch that can be released.
} commit_epoch;
struct
{
Expand Down Expand Up @@ -224,6 +260,15 @@ static _IRQL_requires_(DISPATCH_LEVEL) void _ebpf_epoch_arm_timer_if_needed(ebpf
static void
_ebpf_epoch_work_item_callback(_In_ cxplat_preemptible_work_item_t* preemptible_work_item, void* context);

static void
_ebpf_epoch_activate_cpu(uint32_t cpu_id);

static void
_ebpf_epoch_deactivate_cpu(uint32_t cpu_id);

uint32_t
_ebpf_epoch_next_active_cpu(uint32_t cpu_id);

/**
* @brief Raise the CPU's IRQL to DISPATCH_LEVEL if it is below DISPATCH_LEVEL.
* First check if the IRQL is below DISPATCH_LEVEL to avoid the overhead of
Expand Down Expand Up @@ -278,12 +323,13 @@ ebpf_epoch_initiate()
goto Error;
}

ebpf_lock_create(&_ebpf_epoch_cpu_active_lock);

ebpf_assert(EBPF_CACHE_ALIGN_POINTER(_ebpf_epoch_cpu_table) == _ebpf_epoch_cpu_table);

// Initialize the per-CPU state.
for (uint32_t cpu_id = 0; cpu_id < _ebpf_epoch_cpu_count; cpu_id++) {
ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id];
cpu_entry->current_epoch = 1;
ebpf_list_initialize(&cpu_entry->epoch_state_list);
ebpf_list_initialize(&cpu_entry->free_list);
}
Expand All @@ -302,6 +348,12 @@ ebpf_epoch_initiate()
}
}

// CPU 0 is always active.
_ebpf_epoch_activate_cpu(0);

// Set the current epoch for CPU 0.
_ebpf_epoch_cpu_table[0].current_epoch = EBPF_EPOCH_FIRST_EPOCH;

KeInitializeDpc(&_ebpf_epoch_timer_dpc, _ebpf_epoch_timer_worker, NULL);
KeSetTargetProcessorDpc(&_ebpf_epoch_timer_dpc, 0);

Expand Down Expand Up @@ -358,6 +410,7 @@ ebpf_epoch_terminate()
cxplat_free(
_ebpf_epoch_cpu_table, CXPLAT_POOL_FLAG_NON_PAGED | CXPLAT_POOL_FLAG_CACHE_ALIGNED, EBPF_POOL_TAG_EPOCH);
_ebpf_epoch_cpu_table = NULL;

EBPF_RETURN_VOID();
}

Expand All @@ -376,6 +429,10 @@ ebpf_epoch_enter(_Out_ ebpf_epoch_state_t* epoch_state)
ebpf_list_insert_tail(&cpu_entry->epoch_state_list, &epoch_state->epoch_list_entry);

_ebpf_epoch_lower_to_previous_irql(epoch_state->irql_at_enter);

if (!cpu_entry->active) {
_ebpf_epoch_activate_cpu(epoch_state->cpu_id);
}
}
#pragma warning(pop)

Expand Down Expand Up @@ -650,6 +707,10 @@ _ebpf_epoch_insert_in_free_list(_In_ ebpf_epoch_allocation_header_t* header)
uint32_t cpu_id = ebpf_get_current_cpu();
ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id];

if (!cpu_entry->active) {
_ebpf_epoch_activate_cpu(cpu_id);
}

if (cpu_entry->rundown_in_progress) {
KeLowerIrql(old_irql);
switch (header->entry_type) {
Expand Down Expand Up @@ -747,8 +808,6 @@ void
_ebpf_epoch_messenger_propose_release_epoch(
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu)
{
// Walk over each thread_entry in the epoch_state_list and compute the minimum epoch.
ebpf_list_entry_t* entry = cpu_entry->epoch_state_list.Flink;
ebpf_epoch_state_t* epoch_state;
uint32_t next_cpu;

Expand All @@ -760,32 +819,43 @@ _ebpf_epoch_messenger_propose_release_epoch(
}
// Other CPUs update the current epoch.
else {
// If the epoch was unknown, then update the freed_epoch for all items in the free list now that we know the
// current epoch. This occurs when the CPU is activated and continues until the first epoch is proposed.
if (cpu_entry->current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH) {
for (ebpf_list_entry_t* entry = cpu_entry->free_list.Flink; entry != &cpu_entry->free_list;
entry = entry->Flink) {
ebpf_epoch_allocation_header_t* header =
CONTAINING_RECORD(entry, ebpf_epoch_allocation_header_t, list_entry);
ebpf_assert(header->freed_epoch == EBPF_EPOCH_UNKNOWN_EPOCH);
header->freed_epoch = cpu_entry->current_epoch;
}
}

cpu_entry->current_epoch = message->message.propose_epoch.current_epoch;
}

// Put a memory barrier here to ensure that the write is not re-ordered.
MemoryBarrier();

// Previous CPU's minimum epoch.
uint64_t minimum_epoch = message->message.propose_epoch.proposed_release_epoch;
int64_t minimum_epoch = message->message.propose_epoch.proposed_release_epoch;

while (entry != &cpu_entry->epoch_state_list) {
// Walk over each thread_entry in the epoch_state_list and compute the minimum epoch.
for (ebpf_list_entry_t* entry = &cpu_entry->epoch_state_list; entry != &cpu_entry->epoch_state_list;
entry = entry->Flink) {
epoch_state = CONTAINING_RECORD(entry, ebpf_epoch_state_t, epoch_list_entry);
minimum_epoch = min(minimum_epoch, epoch_state->epoch);
entry = entry->Flink;
}

// Set the proposed release epoch to the minimum epoch seen so far.
message->message.propose_epoch.proposed_release_epoch = minimum_epoch;

next_cpu = _ebpf_epoch_next_active_cpu(current_cpu);

// If this is the last CPU, then send a message to the first CPU to commit the release epoch.
if (current_cpu == _ebpf_epoch_cpu_count - 1) {
if (next_cpu == 0) {
message->message.commit_epoch.released_epoch = minimum_epoch;
message->message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_COMMIT_RELEASE_EPOCH;
next_cpu = 0;
} else {
// Send the message to the next CPU.
next_cpu = current_cpu + 1;
}

_ebpf_epoch_send_message_async(message, next_cpu);
Expand Down Expand Up @@ -813,22 +883,41 @@ _ebpf_epoch_messenger_commit_release_epoch(
{
uint32_t next_cpu;

// If any epoch_states are in EBPF_EPOCH_UNKNOWN_EPOCH, then activation of a CPU is in progress.
bool other_cpus_are_activating = (message->message.commit_epoch.released_epoch == EBPF_EPOCH_UNKNOWN_EPOCH);

// If this CPU is in EBPF_EPOCH_UNKNOWN_EPOCH, then activation of this CPU is in progress.
bool this_cpu_is_activating = (cpu_entry->current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH);

cpu_entry->timer_armed = false;
// Set the released_epoch to the value computed by the EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH message.
cpu_entry->released_epoch = message->message.commit_epoch.released_epoch - 1;

next_cpu = _ebpf_epoch_next_active_cpu(current_cpu);

// If this is the last CPU, send the message to the first CPU to complete the cycle.
if (current_cpu != _ebpf_epoch_cpu_count - 1) {
// Send the message to the next CPU.
next_cpu = current_cpu + 1;
} else {
if (next_cpu == 0) {
message->message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_EPOCH_COMPLETE;
next_cpu = 0;
}

_ebpf_epoch_send_message_async(message, next_cpu);

// Wait for all the CPUs to transition to an active state.
if (other_cpus_are_activating || this_cpu_is_activating) {
// One or more CPUs are still activating. Rearm the timer and wait for the next message.
_ebpf_epoch_arm_timer_if_needed(cpu_entry);
return;
}

// All CPUs have transitioned to an active state and the epoch computation was successfully completed.
// Release any memory that is associated with expired epochs.
_ebpf_epoch_release_free_list(cpu_entry, cpu_entry->released_epoch);

// Check if this CPU is idle and deactivate it if it is (CPU 0 is always active).
if ((current_cpu != 0) && ebpf_list_is_empty(&cpu_entry->free_list) &&
ebpf_list_is_empty(&cpu_entry->epoch_state_list)) {
_ebpf_epoch_deactivate_cpu(current_cpu);
}
}

/**
Expand Down Expand Up @@ -894,15 +983,13 @@ _ebpf_epoch_messenger_rundown_in_progress(
{
uint32_t next_cpu;
cpu_entry->rundown_in_progress = true;

next_cpu = _ebpf_epoch_next_active_cpu(current_cpu);

// If this is the last CPU, then stop.
if (current_cpu != _ebpf_epoch_cpu_count - 1) {
// Send the message to the next CPU.
next_cpu = current_cpu + 1;
} else {
if (next_cpu == 0) {
// Signal the caller that rundown is complete.
KeSetEvent(&message->completion_event, 0, FALSE);
// Set next_cpu to UINT32_MAX to make code analysis happy.
next_cpu = UINT32_MAX;
return;
}

Expand Down Expand Up @@ -1028,3 +1115,84 @@ _ebpf_epoch_work_item_callback(_In_ cxplat_preemptible_work_item_t* preemptible_

cxplat_release_rundown_protection(&_ebpf_epoch_work_item_rundown_ref);
}

/**
* @brief Add the CPU to the next active CPU table.
*
* @param[in] cpu_id CPU to add.
*/
static void
_ebpf_epoch_activate_cpu(uint32_t cpu_id)
{
EBPF_LOG_ENTRY();

EBPF_LOG_MESSAGE_UINT64(EBPF_TRACELOG_LEVEL_INFO, EBPF_TRACELOG_KEYWORD_EPOCH, "Activating CPU", cpu_id);

ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id];
ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_cpu_active_lock);

cpu_entry->active = true;
// When the CPU is activated, the current epoch is not known.
// Memory freed before the current epoch is set will have its epoch set to EBPF_EPOCH_UNKNOWN_EPOCH and have its
// epoch set when the current epoch is known (i.e., when the next epoch is proposed).
cpu_entry->current_epoch = EBPF_EPOCH_UNKNOWN_EPOCH;

if (!cpu_entry->work_queue_assigned) {
ebpf_result_t result = ebpf_timed_work_queue_set_cpu_id(cpu_entry->work_queue, cpu_id);
if (result != EBPF_SUCCESS) {
// This is a fatal error. The epoch system is in an inconsistent state.
__fastfail(FAST_FAIL_INVALID_ARG);
}
cpu_entry->work_queue_assigned = 1;
}

ebpf_lock_unlock(&_ebpf_epoch_cpu_active_lock, state);
EBPF_LOG_EXIT();
}

/**
* @brief Remove the CPU from the next active CPU table.
*
* @param[in] cpu_id CPU to remove.
*/
static void
_ebpf_epoch_deactivate_cpu(uint32_t cpu_id)
{
EBPF_LOG_ENTRY();

EBPF_LOG_MESSAGE_UINT64(EBPF_TRACELOG_LEVEL_INFO, EBPF_TRACELOG_KEYWORD_EPOCH, "Deactivating CPU", cpu_id);

ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id];
ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_cpu_active_lock);
cpu_entry->active = false;
ebpf_lock_unlock(&_ebpf_epoch_cpu_active_lock, state);

EBPF_LOG_EXIT();
}

/**
* @brief Given the current CPU, return the next active CPU.
*
* @param[in] cpu_id The current CPU.
* @return The next active CPU.
*/
uint32_t
_ebpf_epoch_next_active_cpu(uint32_t cpu_id)
{
uint32_t next_active_cpu;
ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_cpu_active_lock);

for (next_active_cpu = cpu_id + 1; next_active_cpu < _ebpf_epoch_cpu_count; next_active_cpu++) {
if (_ebpf_epoch_cpu_table[next_active_cpu].active) {
break;
}
}

if (next_active_cpu == _ebpf_epoch_cpu_count) {
next_active_cpu = 0;
}

ebpf_lock_unlock(&_ebpf_epoch_cpu_active_lock, state);

return next_active_cpu;
}
2 changes: 1 addition & 1 deletion libs/runtime/ebpf_epoch.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ extern "C"
typedef struct _ebpf_epoch_state
{
LIST_ENTRY epoch_list_entry; /// List entry for the epoch list.
uint64_t epoch; /// The epoch when this entry was added to the list.
int64_t epoch; /// The epoch when this entry was added to the list.
uint32_t cpu_id; /// The CPU on which this entry was added to the list.
KIRQL irql_at_enter; /// The IRQL when this entry was added to the list.
} ebpf_epoch_state_t;
Expand Down
Loading

0 comments on commit a0b093c

Please sign in to comment.