Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/seastar/core/coroutine.hh
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public:
task* waiting_task() noexcept override { return _promise.waiting_task(); }

scheduling_group set_scheduling_group(scheduling_group sg) noexcept {
return std::exchange(this->_sg, sg);
return task::set_scheduling_group(sg);
}
};
};
Expand Down
3 changes: 2 additions & 1 deletion include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include <seastar/util/log.hh>
#include <seastar/util/noncopyable_function.hh>
#include <seastar/util/std-compat.hh>
#include <seastar/util/split-list.hh>
#include "internal/pollable_fd.hh"

#include <atomic>
Expand Down Expand Up @@ -293,7 +294,7 @@ private:
explicit task_queue(task_queue_group* p, unsigned id, sstring name, sstring shortname, float shares);
const uint8_t _id;
uint64_t _tasks_processed = 0;
circular_buffer<task*> _q;
internal::intrusive_split_list<task, 16, &task::_next> _q;
sstring _name;
// the shortened version of scheduling_gruop's name, only the first 4
// chars are used.
Expand Down
27 changes: 22 additions & 5 deletions include/seastar/core/task.hh
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,24 @@

namespace seastar {

class reactor;

class task {
protected:
scheduling_group _sg;
private:
friend class reactor;
union {
unsigned _scheduling_group_id;
task* _next;
};

static uintptr_t disguise_sched_group(scheduling_group sg) noexcept {
unsigned id = internal::scheduling_group_index(sg);
return (id << 1) | 0x1;
}
static scheduling_group unveil_sched_group(uintptr_t val) noexcept {
SEASTAR_ASSERT(val & 0x1);
return internal::scheduling_group_from_index(val >> 1);
}
#ifdef SEASTAR_TASK_BACKTRACE
shared_backtrace _bt;
#endif
Expand All @@ -43,14 +57,14 @@ protected:
~task() = default;

scheduling_group set_scheduling_group(scheduling_group new_sg) noexcept{
return std::exchange(_sg, new_sg);
return unveil_sched_group(std::exchange(_scheduling_group_id, disguise_sched_group(new_sg)));
}
public:
explicit task(scheduling_group sg = current_scheduling_group()) noexcept : _sg(sg) {}
explicit task(scheduling_group sg = current_scheduling_group()) noexcept : _scheduling_group_id(disguise_sched_group(sg)) {}
virtual void run_and_dispose() noexcept = 0;
/// Returns the next task which is waiting for this task to complete execution, or nullptr.
virtual task* waiting_task() noexcept = 0;
scheduling_group group() const { return _sg; }
scheduling_group group() const { return unveil_sched_group(_scheduling_group_id); }
#ifdef SEASTAR_TASK_BACKTRACE
void make_backtrace() noexcept;
shared_backtrace get_backtrace() const { return _bt; }
Expand All @@ -60,6 +74,9 @@ public:
#endif
};

// The sched_group disguising/unveiling (see above) assumes that
// the task* always has its zero bit cleared
static_assert(alignof(task) > 1, "task pointer must not occupy zero bit");

void schedule(task* t) noexcept;
void schedule_checked(task* t) noexcept;
Expand Down
128 changes: 128 additions & 0 deletions include/seastar/util/split-list.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Copyright (C) 2025 ScyllaDB
*/

#pragma once

#include <array>

namespace seastar {
namespace internal {

template <typename T, unsigned F, T* T::*Member>
class intrusive_split_list {
std::array<T*, F> _first;
std::array<T**, F> _last_p;
unsigned _tail_idx;
unsigned _head_idx;

public:
intrusive_split_list() noexcept
: _tail_idx(0)
, _head_idx(0)
{
for (unsigned i = 0; i < F; i++) {
_first[i] = nullptr;
_last_p[i] = &_first[i];
}
}

intrusive_split_list(const intrusive_split_list&) = delete;
intrusive_split_list(intrusive_split_list&&) = delete;

void push_back(T* element) noexcept {
auto i = _tail_idx++ % F;
*_last_p[i] = element;
_last_p[i] = &(element->*Member);
element->*Member = nullptr;
}

void push_front(T* element) noexcept {
auto i = --_head_idx % F;
element->*Member = _first[i];
_first[i] = element;
if (_last_p[i] == &_first[i]) {
_last_p[i] = &(element->*Member);
}
}

bool empty() const noexcept {
return _head_idx == _tail_idx;
}

size_t size() const noexcept {
return _tail_idx - _head_idx;
}

T* pop_front() noexcept {
auto i = _head_idx++ % F;
T* ret = _first[i];
_first[i] = _first[i]->*Member;
if (_last_p[i] == &(ret->*Member)) {
_last_p[i] = &_first[i];
}
return ret;
}

class iterator {
std::array<T*, F> _cur;
unsigned _idx;

public:
iterator(unsigned idx) noexcept
: _cur({nullptr})
, _idx(idx)
{}

iterator(const intrusive_split_list& sl) noexcept
: _cur(sl._first)
, _idx(sl._head_idx)
{}

iterator& operator++() noexcept {
_cur[_idx % F] = _cur[_idx % F]->*Member;
_idx++;
return *this;
}

T* operator->() noexcept {
return _cur[_idx % F];
}

T& operator*() noexcept {
return *_cur[_idx % F];
}

bool operator==(const iterator& o) noexcept {
return _idx == o._idx;
}

bool operator!=(const iterator& o) noexcept {
return _idx != o._idx;
}
};

iterator begin() const noexcept { return iterator(*this); }
iterator end() const noexcept { return iterator(_tail_idx); }
};

} // internal namespace
} // seastar namespace
45 changes: 23 additions & 22 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2619,7 +2619,7 @@ seastar::internal::log_buf::inserter_iterator do_dump_task_queue(seastar::intern
memory::scoped_critical_alloc_section _;
std::unordered_map<const char*, unsigned> infos;
for (const auto& tp : tq._q) {
const std::type_info& ti = typeid(*tp);
const std::type_info& ti = typeid(tp);
auto [ it, ins ] = infos.emplace(std::make_pair(ti.name(), 0u));
it->second++;
}
Expand All @@ -2630,14 +2630,33 @@ seastar::internal::log_buf::inserter_iterator do_dump_task_queue(seastar::intern
return it;
}

namespace {

#ifdef SEASTAR_SHUFFLE_TASK_QUEUE
bool shuffle() {
static thread_local std::mt19937 gen = std::mt19937(std::default_random_engine()());
std::uniform_int_distribution<size_t> tasks_dist{0, 2};
return tasks_dist(gen) == 0;
}
#else
constexpr bool shuffle() { return false; }
#endif

}

bool reactor::task_queue::run_tasks() {
reactor& r = engine();

// Make sure new tasks will inherit our scheduling group
*internal::current_scheduling_group_ptr() = scheduling_group(_id);
scheduling_group current = scheduling_group(_id);
*internal::current_scheduling_group_ptr() = current;
while (!_q.empty()) {
auto tsk = _q.front();
_q.pop_front();
task* tsk = _q.pop_front();
tsk->_scheduling_group_id = task::disguise_sched_group(current);
if (shuffle()) {
_q.push_back(tsk);
continue;
}
STAP_PROBE(seastar, reactor_run_tasks_single_start);
internal::task_histogram_add_task(*tsk);
r._current_task = tsk;
Expand Down Expand Up @@ -2671,22 +2690,6 @@ bool reactor::task_queue::run_tasks() {
return !_q.empty();
}

namespace {

#ifdef SEASTAR_SHUFFLE_TASK_QUEUE
void shuffle(task*& t, circular_buffer<task*>& q) {
static thread_local std::mt19937 gen = std::mt19937(std::default_random_engine()());
std::uniform_int_distribution<size_t> tasks_dist{0, q.size() - 1};
auto& to_swap = q[tasks_dist(gen)];
std::swap(to_swap, t);
}
#else
void shuffle(task*&, circular_buffer<task*>&) {
}
#endif

}

void reactor::force_poll() {
request_preemption();
}
Expand Down Expand Up @@ -3077,7 +3080,6 @@ void reactor::add_task(task* t) noexcept {
auto* q = _task_queues[sg._id].get();
bool was_empty = q->_q.empty();
q->_q.push_back(std::move(t));
shuffle(q->_q.back(), q->_q);
if (was_empty) {
q->wakeup();
}
Expand All @@ -3089,7 +3091,6 @@ void reactor::add_urgent_task(task* t) noexcept {
auto* q = _task_queues[sg._id].get();
bool was_empty = q->_q.empty();
q->_q.push_front(std::move(t));
shuffle(q->_q.front(), q->_q);
if (was_empty) {
q->wakeup();
}
Expand Down
Loading