Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,9 @@ private:
bool have_more_tasks() const;
bool posix_reuseport_detect();
future<> rename_scheduling_group_specific_data(scheduling_group sg);
future<seastar::scheduling_group> init_scheduling_group(sstring name, sstring shortname, float shares, scheduling_supergroup p);
future<> init_scheduling_group(scheduling_group sg, sstring name, sstring shortname, float shares, scheduling_supergroup p);
future<> init_scheduling_group_specific_data(scheduling_group sg);
future<> init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg);
future<> destroy_scheduling_group(scheduling_group sg) noexcept;
uint64_t tasks_processed() const;
Expand Down Expand Up @@ -728,6 +730,7 @@ private:
friend class internal::poller;
friend class scheduling_group;
friend class scheduling_supergroup;
friend size_t internal::scheduling_group_count();
friend void internal::add_to_flush_poller(output_stream<char>& os) noexcept;
friend void seastar::internal::increase_thrown_exceptions_counter() noexcept;
friend void seastar::internal::increase_internal_errors_counter() noexcept;
Expand Down
101 changes: 51 additions & 50 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4899,38 +4899,14 @@ std::chrono::nanoseconds reactor::total_steal_time() const {
return mono_steal;
}

static std::atomic<unsigned long> s_used_scheduling_group_ids_bitmap{3}; // 0=main, 1=atexit
static std::atomic<unsigned long> s_next_scheduling_group_specific_key{0};

static
int
allocate_scheduling_group_id() noexcept {
static_assert(max_scheduling_groups() <= std::numeric_limits<unsigned long>::digits, "more scheduling groups than available bits");
auto b = s_used_scheduling_group_ids_bitmap.load(std::memory_order_relaxed);
auto nb = b;
unsigned i = 0;
do {
if (__builtin_popcountl(b) == max_scheduling_groups()) {
return -1;
}
i = count_trailing_zeros(~b);
nb = b | (1ul << i);
} while (!s_used_scheduling_group_ids_bitmap.compare_exchange_weak(b, nb, std::memory_order_relaxed));
return i;
}

static
unsigned long
allocate_scheduling_group_specific_key() noexcept {
return s_next_scheduling_group_specific_key.fetch_add(1, std::memory_order_relaxed);
}

static
void
deallocate_scheduling_group_id(unsigned id) noexcept {
s_used_scheduling_group_ids_bitmap.fetch_and(~(1ul << id), std::memory_order_relaxed);
}

static
internal::scheduling_group_specific_thread_local_data::specific_val
allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const lw_shared_ptr<scheduling_group_key_config>& cfg) {
Expand All @@ -4954,32 +4930,62 @@ reactor::rename_scheduling_group_specific_data(scheduling_group sg) {
}

future<>
reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstring shortname, float shares, scheduling_supergroup parent) {
return with_shared(_scheduling_group_keys_mutex, [this, sg, name = std::move(name), shortname = std::move(shortname), shares, parent] {
get_sg_data(sg).queue_is_initialized = true;
reactor::init_scheduling_group_specific_data(scheduling_group sg) {
return with_scheduling_group(sg, [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = get_sg_data(sg);
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg);
}
});
}

future<scheduling_group>
reactor::init_scheduling_group(sstring name, sstring shortname, float shares, scheduling_supergroup parent) {
return with_shared(_scheduling_group_keys_mutex, [this, name = std::move(name), shortname = std::move(shortname), shares, parent] {
unsigned id = 0;
while (id < max_scheduling_groups() && _task_queues[id] != nullptr) {
id++;
}
if (id == max_scheduling_groups()) {
return make_exception_future<scheduling_group>(std::runtime_error(fmt::format("Scheduling group limit exceeded while creating {}", name)));
}

auto* group = &_cpu_sched;
if (!parent.is_root()) {
if (_supergroups[parent.index()] == nullptr) {
return make_exception_future<>(std::runtime_error("Requested supergroup doesn't exist"));
return make_exception_future<scheduling_group>(std::runtime_error("Requested supergroup doesn't exist"));
}
group = _supergroups[parent.index()].get();
}
if (group->_nr_children == max_scheduling_groups()) {
return make_exception_future<>(std::runtime_error(fmt::format("Supergroup children limit exceeded while creating {}", name)));
return make_exception_future<scheduling_group>(std::runtime_error(fmt::format("Supergroup children limit exceeded while creating {}", name)));
}

auto sg = scheduling_group(id);
get_sg_data(sg).queue_is_initialized = true;
_task_queues[sg._id] = std::make_unique<task_queue>(group, sg._id, name, shortname, shares);

return with_scheduling_group(sg, [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = get_sg_data(sg);
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg);
}
return init_scheduling_group_specific_data(sg).then([sg] {
return make_ready_future<scheduling_group>(sg);
});
});
}

future<>
reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstring shortname, float shares, scheduling_supergroup parent) {
return with_shared(_scheduling_group_keys_mutex, [this, sg, name = std::move(name), shortname = std::move(shortname), shares, parent] {
get_sg_data(sg).queue_is_initialized = true;
auto* group = &_cpu_sched;
if (!parent.is_root()) {
group = _supergroups[parent.index()].get();
}
_task_queues[sg._id] = std::make_unique<task_queue>(group, sg._id, name, shortname, shares);
return init_scheduling_group_specific_data(sg);
});
}

future<>
reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) {
return with_lock(_scheduling_group_keys_mutex, [this, key, cfg] {
Expand Down Expand Up @@ -5174,18 +5180,16 @@ future<> destroy_scheduling_supergroup(scheduling_supergroup sg) noexcept {

future<scheduling_group>
create_scheduling_group(sstring name, sstring shortname, float shares, scheduling_supergroup parent) noexcept {
auto aid = allocate_scheduling_group_id();
if (aid < 0) {
return make_exception_future<scheduling_group>(std::runtime_error(fmt::format("Scheduling group limit exceeded while creating {}", name)));
}
auto id = static_cast<unsigned>(aid);
SEASTAR_ASSERT(id < max_scheduling_groups());
auto sg = scheduling_group(id);
return smp::invoke_on_all([sg, name, shortname, shares, parent] {
auto sg = co_await smp::submit_to(0, [name, shortname, shares, parent] {
return engine().init_scheduling_group(name, shortname, shares, parent);
});
co_await smp::invoke_on_all([sg, name, shortname, shares, parent] {
if (this_shard_id() == 0) {
return make_ready_future<>();
}
return engine().init_scheduling_group(sg, name, shortname, shares, parent);
}).then([sg] {
return make_ready_future<scheduling_group>(sg);
});
co_return sg;
}

future<scheduling_group>
Expand Down Expand Up @@ -5218,8 +5222,6 @@ destroy_scheduling_group(scheduling_group sg) noexcept {
}
return smp::invoke_on_all([sg] {
return engine().destroy_scheduling_group(sg);
}).then([sg] {
deallocate_scheduling_group_id(sg._id);
});
}

Expand Down Expand Up @@ -5326,8 +5328,7 @@ std::ostream& operator<<(std::ostream& os, const stall_report& sr) {
}

size_t scheduling_group_count() {
auto b = s_used_scheduling_group_ids_bitmap.load(std::memory_order_relaxed);
return __builtin_popcountl(b);
return max_scheduling_groups() - std::count(engine()._task_queues.begin(), engine()._task_queues.end(), nullptr);
}

void
Expand Down