Skip to content

Commit 646adfb

Browse files
committed
core: add label to io_threaded_fallbacks to categorize operations
1 parent ee2b79d commit 646adfb

File tree

4 files changed

+73
-35
lines changed

4 files changed

+73
-35
lines changed

src/core/file.cc

+8-8
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ future<int>
230230
posix_file_impl::ioctl(uint64_t cmd, void* argp) noexcept {
231231
return engine()._thread_pool->submit<syscall_result<int>>([this, cmd, argp] () mutable {
232232
return wrap_syscall<int>(::ioctl(_fd, cmd, argp));
233-
}).then([] (syscall_result<int> sr) {
233+
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
234234
sr.throw_if_error();
235235
// Some ioctls require to return a positive integer back.
236236
return make_ready_future<int>(sr.result);
@@ -251,7 +251,7 @@ future<int>
251251
posix_file_impl::fcntl(int op, uintptr_t arg) noexcept {
252252
return engine()._thread_pool->submit<syscall_result<int>>([this, op, arg] () mutable {
253253
return wrap_syscall<int>(::fcntl(_fd, op, arg));
254-
}).then([] (syscall_result<int> sr) {
254+
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
255255
sr.throw_if_error();
256256
// Some fcntls require to return a positive integer back.
257257
return make_ready_future<int>(sr.result);
@@ -273,7 +273,7 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
273273
return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
274274
return wrap_syscall<int>(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
275275
offset, length));
276-
}).then([] (syscall_result<int> sr) {
276+
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
277277
sr.throw_if_error();
278278
return make_ready_future<>();
279279
});
@@ -294,7 +294,7 @@ posix_file_impl::allocate(uint64_t position, uint64_t length) noexcept {
294294
supported = false; // Racy, but harmless. At most we issue an extra call or two.
295295
}
296296
return wrap_syscall<int>(ret);
297-
}).then([] (syscall_result<int> sr) {
297+
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
298298
sr.throw_if_error();
299299
return make_ready_future<>();
300300
});
@@ -336,7 +336,7 @@ posix_file_impl::close() noexcept {
336336
try {
337337
return engine()._thread_pool->submit<syscall_result<int>>([fd] {
338338
return wrap_syscall<int>(::close(fd));
339-
});
339+
}, submit_reason::file_operation);
340340
} catch (...) {
341341
report_exception("Running ::close() in reactor thread, submission failed with exception", std::current_exception());
342342
return make_ready_future<syscall_result<int>>(wrap_syscall<int>(::close(fd)));
@@ -358,7 +358,7 @@ blockdev_file_impl::size() noexcept {
358358
uint64_t size;
359359
int ret = ::ioctl(_fd, BLKGETSIZE64, &size);
360360
return wrap_syscall(ret, size);
361-
}).then([] (syscall_result_extra<uint64_t> ret) {
361+
}, submit_reason::file_operation).then([] (syscall_result_extra<uint64_t> ret) {
362362
ret.throw_if_error();
363363
return make_ready_future<uint64_t>(ret.extra);
364364
});
@@ -401,7 +401,7 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex
401401
return engine()._thread_pool->submit<syscall_result<long>>([w , this] () {
402402
auto ret = ::syscall(__NR_getdents64, _fd, reinterpret_cast<linux_dirent64*>(w->buffer), buffer_size);
403403
return wrap_syscall(ret);
404-
}).then([w] (syscall_result<long> ret) {
404+
}, submit_reason::file_operation).then([w] (syscall_result<long> ret) {
405405
ret.throw_if_error();
406406
if (ret.result == 0) {
407407
w->eof = true;
@@ -631,7 +631,7 @@ blockdev_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
631631
return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
632632
uint64_t range[2] { offset, length };
633633
return wrap_syscall<int>(::ioctl(_fd, BLKDISCARD, &range));
634-
}).then([] (syscall_result<int> sr) {
634+
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
635635
sr.throw_if_error();
636636
return make_ready_future<>();
637637
});

src/core/reactor.cc

+32-21
Original file line numberDiff line numberDiff line change
@@ -1984,7 +1984,7 @@ reactor::open_file_dma(std::string_view nameref, open_flags flags, file_open_opt
19841984
}
19851985
close_fd.cancel();
19861986
return wrap_syscall(fd, st);
1987-
}).then([&options, name = std::move(name), &open_flags] (syscall_result_extra<struct stat> sr) {
1987+
}, submit_reason::file_operation).then([&options, name = std::move(name), &open_flags] (syscall_result_extra<struct stat> sr) {
19881988
sr.throw_fs_exception_if_error("open failed", name);
19891989
return make_file_impl(sr.result, options, open_flags, sr.extra);
19901990
}).then([] (shared_ptr<file_impl> impl) {
@@ -1999,7 +1999,7 @@ reactor::remove_file(std::string_view pathname) noexcept {
19991999
return futurize_invoke([this, pathname] {
20002000
return _thread_pool->submit<syscall_result<int>>([pathname = sstring(pathname)] {
20012001
return wrap_syscall<int>(::remove(pathname.c_str()));
2002-
}).then([pathname = sstring(pathname)] (syscall_result<int> sr) {
2002+
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result<int> sr) {
20032003
sr.throw_fs_exception_if_error("remove failed", pathname);
20042004
return make_ready_future<>();
20052005
});
@@ -2012,7 +2012,8 @@ reactor::rename_file(std::string_view old_pathname, std::string_view new_pathnam
20122012
return futurize_invoke([this, old_pathname, new_pathname] {
20132013
return _thread_pool->submit<syscall_result<int>>([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] {
20142014
return wrap_syscall<int>(::rename(old_pathname.c_str(), new_pathname.c_str()));
2015-
}).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
2015+
}, submit_reason::file_operation
2016+
).then([old_pathname = sstring(old_pathname), new_pathname = sstring(new_pathname)] (syscall_result<int> sr) {
20162017
sr.throw_fs_exception_if_error("rename failed", old_pathname, new_pathname);
20172018
return make_ready_future<>();
20182019
});
@@ -2025,7 +2026,7 @@ reactor::link_file(std::string_view oldpath, std::string_view newpath) noexcept
20252026
return futurize_invoke([this, oldpath, newpath] {
20262027
return _thread_pool->submit<syscall_result<int>>([oldpath = sstring(oldpath), newpath = sstring(newpath)] {
20272028
return wrap_syscall<int>(::link(oldpath.c_str(), newpath.c_str()));
2028-
}).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) {
2029+
}, submit_reason::file_operation).then([oldpath = sstring(oldpath), newpath = sstring(newpath)] (syscall_result<int> sr) {
20292030
sr.throw_fs_exception_if_error("link failed", oldpath, newpath);
20302031
return make_ready_future<>();
20312032
});
@@ -2039,7 +2040,7 @@ reactor::chmod(std::string_view name, file_permissions permissions) noexcept {
20392040
return futurize_invoke([name, mode, this] {
20402041
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), mode] {
20412042
return wrap_syscall<int>(::chmod(name.c_str(), mode));
2042-
}).then([name = sstring(name), mode] (syscall_result<int> sr) {
2043+
}, submit_reason::file_operation).then([name = sstring(name), mode] (syscall_result<int> sr) {
20432044
if (sr.result == -1) {
20442045
auto reason = format("chmod(0{:o}) failed", mode);
20452046
sr.throw_fs_exception(reason, fs::path(name));
@@ -2083,7 +2084,7 @@ reactor::file_type(std::string_view name, follow_symlink follow) noexcept {
20832084
auto stat_syscall = follow ? stat : lstat;
20842085
auto ret = stat_syscall(name.c_str(), &st);
20852086
return wrap_syscall(ret, st);
2086-
}).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) {
2087+
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result_extra<struct stat> sr) {
20872088
if (long(sr.result) == -1) {
20882089
if (sr.error != ENOENT && sr.error != ENOTDIR) {
20892090
sr.throw_fs_exception_if_error("stat failed", name);
@@ -2218,7 +2219,7 @@ reactor::spawn(std::string_view pathname,
22182219
return wrap_syscall<int>(::posix_spawn(&child_pid, pathname.c_str(), &actions, &attr,
22192220
const_cast<char* const *>(argv.data()),
22202221
const_cast<char* const *>(env.data())));
2221-
});
2222+
}, submit_reason::process_operation);
22222223
}).finally([&actions, &attr] {
22232224
posix_spawn_file_actions_destroy(&actions);
22242225
posix_spawnattr_destroy(&attr);
@@ -2269,7 +2270,7 @@ future<int> reactor::waitpid(pid_t pid) {
22692270
&wait_timeout] {
22702271
return _thread_pool->submit<syscall_result<pid_t>>([pid, &wstatus] {
22712272
return wrap_syscall<pid_t>(::waitpid(pid, &wstatus, WNOHANG));
2272-
}).then([&wstatus, &wait_timeout] (syscall_result<pid_t> ret) mutable {
2273+
}, submit_reason::process_operation).then([&wstatus, &wait_timeout] (syscall_result<pid_t> ret) mutable {
22732274
if (ret.result == 0) {
22742275
wait_timeout = next_waitpid_timeout(wait_timeout);
22752276
return ::seastar::sleep(wait_timeout).then([] {
@@ -2289,7 +2290,7 @@ future<int> reactor::waitpid(pid_t pid) {
22892290
return pidfd.readable().then([pid, &wstatus, this] {
22902291
return _thread_pool->submit<syscall_result<pid_t>>([pid, &wstatus] {
22912292
return wrap_syscall<pid_t>(::waitpid(pid, &wstatus, WNOHANG));
2292-
});
2293+
}, submit_reason::process_operation);
22932294
}).then([&wstatus] (syscall_result<pid_t> ret) {
22942295
ret.throw_if_error();
22952296
assert(ret.result > 0);
@@ -2314,7 +2315,7 @@ reactor::file_stat(std::string_view pathname, follow_symlink follow) noexcept {
23142315
auto stat_syscall = follow ? stat : lstat;
23152316
auto ret = stat_syscall(pathname.c_str(), &st);
23162317
return wrap_syscall(ret, st);
2317-
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) {
2318+
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct stat> sr) {
23182319
sr.throw_fs_exception_if_error("stat failed", pathname);
23192320
struct stat& st = sr.extra;
23202321
stat_data sd;
@@ -2352,7 +2353,7 @@ reactor::file_accessible(std::string_view pathname, access_flags flags) noexcept
23522353
auto aflags = std::underlying_type_t<access_flags>(flags);
23532354
auto ret = ::access(pathname.c_str(), aflags);
23542355
return wrap_syscall(ret);
2355-
}).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) {
2356+
}, submit_reason::file_operation).then([pathname = sstring(pathname), flags] (syscall_result<int> sr) {
23562357
if (sr.result < 0) {
23572358
if ((sr.error == ENOENT && flags == access_flags::exists) ||
23582359
(sr.error == EACCES && flags != access_flags::exists)) {
@@ -2374,7 +2375,7 @@ reactor::file_system_at(std::string_view pathname) noexcept {
23742375
struct statfs st;
23752376
auto ret = statfs(pathname.c_str(), &st);
23762377
return wrap_syscall(ret, st);
2377-
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) {
2378+
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct statfs> sr) {
23782379
static std::unordered_map<long int, fs_type> type_mapper = {
23792380
{ internal::fs_magic::xfs, fs_type::xfs },
23802381
{ internal::fs_magic::ext2, fs_type::ext2 },
@@ -2401,7 +2402,7 @@ reactor::fstatfs(int fd) noexcept {
24012402
struct statfs st;
24022403
auto ret = ::fstatfs(fd, &st);
24032404
return wrap_syscall(ret, st);
2404-
}).then([] (syscall_result_extra<struct statfs> sr) {
2405+
}, submit_reason::file_operation).then([] (syscall_result_extra<struct statfs> sr) {
24052406
sr.throw_if_error();
24062407
struct statfs st = sr.extra;
24072408
return make_ready_future<struct statfs>(std::move(st));
@@ -2416,7 +2417,7 @@ reactor::statvfs(std::string_view pathname) noexcept {
24162417
struct statvfs st;
24172418
auto ret = ::statvfs(pathname.c_str(), &st);
24182419
return wrap_syscall(ret, st);
2419-
}).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) {
2420+
}, submit_reason::file_operation).then([pathname = sstring(pathname)] (syscall_result_extra<struct statvfs> sr) {
24202421
sr.throw_fs_exception_if_error("statvfs failed", pathname);
24212422
struct statvfs st = sr.extra;
24222423
return make_ready_future<struct statvfs>(std::move(st));
@@ -2440,7 +2441,7 @@ reactor::open_directory(std::string_view name) noexcept {
24402441
}
24412442
}
24422443
return wrap_syscall(fd, st);
2443-
}).then([name = sstring(name), oflags] (syscall_result_extra<struct stat> sr) {
2444+
}, submit_reason::file_operation).then([name = sstring(name), oflags] (syscall_result_extra<struct stat> sr) {
24442445
sr.throw_fs_exception_if_error("open failed", name);
24452446
return make_file_impl(sr.result, file_open_options(), oflags, sr.extra);
24462447
}).then([] (shared_ptr<file_impl> file_impl) {
@@ -2456,7 +2457,7 @@ reactor::make_directory(std::string_view name, file_permissions permissions) noe
24562457
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
24572458
auto mode = static_cast<mode_t>(permissions);
24582459
return wrap_syscall<int>(::mkdir(name.c_str(), mode));
2459-
}).then([name = sstring(name)] (syscall_result<int> sr) {
2460+
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result<int> sr) {
24602461
sr.throw_fs_exception_if_error("mkdir failed", name);
24612462
});
24622463
});
@@ -2469,7 +2470,7 @@ reactor::touch_directory(std::string_view name, file_permissions permissions) no
24692470
return _thread_pool->submit<syscall_result<int>>([name = sstring(name), permissions] {
24702471
auto mode = static_cast<mode_t>(permissions);
24712472
return wrap_syscall<int>(::mkdir(name.c_str(), mode));
2472-
}).then([name = sstring(name)] (syscall_result<int> sr) {
2473+
}, submit_reason::file_operation).then([name = sstring(name)] (syscall_result<int> sr) {
24732474
if (sr.result == -1 && sr.error != EEXIST) {
24742475
sr.throw_fs_exception("mkdir failed", fs::path(name));
24752476
}
@@ -2514,7 +2515,7 @@ reactor::fdatasync(int fd) noexcept {
25142515
}
25152516
return _thread_pool->submit<syscall_result<int>>([fd] {
25162517
return wrap_syscall<int>(::fdatasync(fd));
2517-
}).then([] (syscall_result<int> sr) {
2518+
}, submit_reason::file_operation).then([] (syscall_result<int> sr) {
25182519
sr.throw_if_error();
25192520
return make_ready_future<>();
25202521
});
@@ -2658,6 +2659,12 @@ void reactor::register_metrics() {
26582659

26592660
namespace sm = seastar::metrics;
26602661

2662+
auto io_fallback_counter = [this](const sstring& reason_str, submit_reason r) {
2663+
static auto reason_label = sm::label("reason");
2664+
return sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::count, _thread_pool.get(), r),
2665+
sm::description("Total number of io-threaded-fallbacks operations"), { reason_label(reason_str), });
2666+
};
2667+
26612668
_metric_groups.add_group("reactor", {
26622669
sm::make_gauge("tasks_pending", std::bind(&reactor::pending_task_count, this), sm::description("Number of pending tasks in the queue")),
26632670
// total_operations value:DERIVE:0:U
@@ -2683,9 +2690,13 @@ void reactor::register_metrics() {
26832690
// total_operations value:DERIVE:0:U
26842691
sm::make_counter("fsyncs", _fsyncs, sm::description("Total number of fsync operations")),
26852692
// total_operations value:DERIVE:0:U
2686-
sm::make_counter("io_threaded_fallbacks", std::bind(&thread_pool::operation_count, _thread_pool.get()),
2687-
sm::description("Total number of io-threaded-fallbacks operations")),
2688-
2693+
io_fallback_counter("aio_fallback", submit_reason::aio_fallback),
2694+
// total_operations value:DERIVE:0:U
2695+
io_fallback_counter("file_operation", submit_reason::file_operation),
2696+
// total_operations value:DERIVE:0:U
2697+
io_fallback_counter("process_operation", submit_reason::process_operation),
2698+
// total_operations value:DERIVE:0:U
2699+
io_fallback_counter("unknown", submit_reason::unknown),
26892700
});
26902701

26912702
_metric_groups.add_group("memory", {

src/core/reactor_backend.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ void aio_storage_context::schedule_retry() {
260260
return _r._thread_pool->submit<syscall_result<int>>([this] () mutable {
261261
auto r = io_submit(_io_context, _aio_retries.size(), _aio_retries.data());
262262
return wrap_syscall<int>(r);
263-
}).then_wrapped([this] (future<syscall_result<int>> f) {
263+
}, submit_reason::aio_fallback).then_wrapped([this] (future<syscall_result<int>> f) {
264264
// If submit failed, just log the error and exit the loop.
265265
// The next call to submit_work will call schedule_retry again.
266266
if (f.failed()) {

src/core/thread_pool.hh

+32-5
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,36 @@ namespace seastar {
2727

2828
class reactor;
2929

30+
// Reasons for why a function had to be submitted to the thread_pool
31+
enum class submit_reason : size_t {
32+
// Used for aio operations what would block in `io_submit`.
33+
aio_fallback,
34+
// Used for file operations that don't have non-blocking alternatives.
35+
file_operation,
36+
// Used for process operations that don't have non-blocking alternatives.
37+
process_operation,
38+
// Used by default when a caller doesn't specify a submission reason.
39+
unknown,
40+
};
41+
42+
class submit_metrics {
43+
uint64_t _counters[static_cast<size_t>(submit_reason::unknown) + 1]{};
44+
45+
public:
46+
void record_reason(submit_reason reason) {
47+
reason = std::min(reason, submit_reason::unknown);
48+
++_counters[static_cast<size_t>(reason)];
49+
}
50+
51+
uint64_t count_for(submit_reason reason) const {
52+
reason = std::min(reason, submit_reason::unknown);
53+
return _counters[static_cast<size_t>(reason)];
54+
}
55+
};
56+
3057
class thread_pool {
3158
reactor* _reactor;
32-
uint64_t _aio_threaded_fallbacks = 0;
59+
submit_metrics metrics;
3360
#ifndef HAVE_OSV
3461
syscall_work_queue inter_thread_wq;
3562
posix_thread _worker_thread;
@@ -39,11 +66,11 @@ public:
3966
explicit thread_pool(reactor* r, sstring thread_name);
4067
~thread_pool();
4168
template <typename T, typename Func>
42-
future<T> submit(Func func) noexcept {
43-
++_aio_threaded_fallbacks;
69+
future<T> submit(Func func, submit_reason reason = submit_reason::unknown) noexcept {
70+
metrics.record_reason(reason);
4471
return inter_thread_wq.submit<T>(std::move(func));
4572
}
46-
uint64_t operation_count() const { return _aio_threaded_fallbacks; }
73+
uint64_t count(submit_reason r) const { return metrics.count_for(r); }
4774

4875
unsigned complete() { return inter_thread_wq.complete(); }
4976
// Before we enter interrupt mode, we must make sure that the syscall thread will properly
@@ -61,7 +88,7 @@ public:
6188
#else
6289
public:
6390
template <typename T, typename Func>
64-
future<T> submit(Func func) { std::cerr << "thread_pool not yet implemented on osv\n"; abort(); }
91+
future<T> submit(Func func, submit_reason reason = submit_reason::unknown) { std::cerr << "thread_pool not yet implemented on osv\n"; abort(); }
6592
#endif
6693
private:
6794
void work(sstring thread_name);

0 commit comments

Comments
 (0)