Skip to content

Commit 82a10f3

Browse files
authored
Merge pull request #111 from Flow-IPC/ipc-143_perf-dive-nov
(See description!) Pull request for repo [flow] branch [ipc-143_perf-dive-nov] to base-branch [main].
2 parents a0e3961 + b7bd40b commit 82a10f3

File tree

101 files changed

+6673
-1936
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+6673
-1936
lines changed

.github/workflows/main.yml

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ jobs:
183183
version: 9
184184
c-path: /usr/bin/gcc-9
185185
cpp-path: /usr/bin/g++-9
186+
install: True
186187
- id: gcc-10
187188
name: gcc
188189
version: 10
@@ -395,13 +396,27 @@ jobs:
395396
# (Unfortunately cannot refer to earlier-assigned `env.` entries within subsequent ones.)
396397
install-dir: ${{ github.workspace }}/install/${{ matrix.build-test-cfg.conan-profile-build-type }}/usr/local
397398
# For the remaining env entries please see comments in Flow-IPC workflow counterpart. We use same techniques.
399+
# Update: (As of this writing this is only here, not in Flow-IPC counterpart, but that'll change; might want to
400+
# move the comment there then.) ASAN is somewhat different from the others, as it's really two sanitizers with
401+
# separate suppression files -- ASAN (actual safety checks) and LSAN (leak checks). So far we do *not* need
402+
# any suppressed ASAN warnings (ASAN false positives are known to be rare, so that makes sense), but there are
403+
# at least a couple minor, intentional mem-leaks, so we *do* need to suppress LSAN warnings at times. Therefore,
404+
# until/unless this changes, we will essentially treat LSAN-suppression as *the* suppression for our ASAN runs,
405+
# thus only requiring one suppression file type still. If/when that changes, the below will need to be made
406+
# more complicated, so that 2 suppression types per sanitizer are supported as opposed to just 1. Until then,
407+
# it's still simple. So that's why $ASAN_OPTIONS does not mention suppression, but $LSAN_OPTIONS does... and
408+
# it is the suppression file(s) (if any) under various `asan/` dirs in the source tree.
398409
san-suppress-cfg-file: ${{ github.workspace }}/install/${{ matrix.build-test-cfg.conan-profile-build-type }}/usr/local/bin/san_suppressions.cfg
399410
san-suppress-cfg-in-file1: sanitize/${{ matrix.build-test-cfg.sanitizer-name }}/suppressions_${{ matrix.compiler.name }}.cfg
400411
san-suppress-cfg-in-file2: sanitize/${{ matrix.build-test-cfg.sanitizer-name }}/suppressions_${{ matrix.compiler.name }}_${{ matrix.compiler.version }}.cfg
401412
setup-tests-env: |
402413
if [ '${{ matrix.build-test-cfg.sanitizer-name }}' = asan ]; then
414+
export SAN_SUPP=1
415+
export SAN_SUPP_CFG=${{ github.workspace }}/install/${{ matrix.build-test-cfg.conan-profile-build-type }}/usr/local/bin/san_suppressions.cfg
403416
export ASAN_OPTIONS='disable_coredump=0'
417+
export LSAN_OPTIONS="suppressions=$SAN_SUPP_CFG"
404418
echo "ASAN_OPTIONS = [$ASAN_OPTIONS]."
419+
echo "LSAN_OPTIONS = [$LSAN_OPTIONS]."
405420
elif [ '${{ matrix.build-test-cfg.sanitizer-name }}' = ubsan ]; then
406421
export SAN_SUPP=1
407422
export SAN_SUPP_CFG=${{ github.workspace }}/install/${{ matrix.build-test-cfg.conan-profile-build-type }}/usr/local/bin/san_suppressions.cfg
@@ -571,6 +586,8 @@ jobs:
571586
# and will grow. The techniques will still apply.
572587

573588
- name: Run test/demo [NetFlow echo]
589+
if: |
590+
!cancelled()
574591
run: |
575592
# Run test/demo [NetFlow echo].
576593
cd ${{ env.install-dir }}/bin
@@ -611,13 +628,15 @@ jobs:
611628
mkdir -p $OUT_DIR
612629
SUPP_DIR_A=${{ github.workspace }}/src
613630
# As of this writing there are TSAN suppressions for this test specifically. TODO: Revisit them; and then this.
631+
# Update: Now there are also ASAN (LSAN) suppressions. These are likely permanent as of this writing.
632+
# Reminder: the following construction handles suppression file(s) from *any* relevant sanitizer type (if any).
614633
SUPP_DIR_OWN=${{ github.workspace }}/test/suite/unit_test
615634
{ cat $SUPP_DIR_A/${{ env.san-suppress-cfg-in-file1 }} $SUPP_DIR_A/${{ env.san-suppress-cfg-in-file2 }} \
616635
$SUPP_DIR_OWN/${{ env.san-suppress-cfg-in-file1 }} $SUPP_DIR_OWN/${{ env.san-suppress-cfg-in-file2 }} \
617636
> ${{ env.san-suppress-cfg-file }} 2> /dev/null; } || true
618637
${{ env.setup-tests-env }}
619638
${{ env.setup-run-env }}
620-
# Sensitive benchmarks in this setting should run and be warned about it they "fail," but they should not
639+
# Sensitive benchmarks in this setting should run and be warned about, if they "fail," but they should not
621640
# fail the test.
622641
$RUN_IT --do-not-fail-benchmarks > $OUT_DIR/console.log 2>&1
623642

src/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ set(SRCS
7979
flow/net_flow/server_socket.cpp
8080
flow/perf/checkpt_timer.cpp
8181
flow/perf/clock_type.cpp
82+
flow/util/blob.cpp
8283
flow/util/detail/sched_task_handle_state.cpp
8384
flow/util/detail/util.cpp
8485
flow/util/sched_task.cpp
@@ -159,6 +160,7 @@ set(HDRS
159160
flow/util/basic_blob.hpp
160161
flow/util/blob.hpp
161162
flow/util/blob_fwd.hpp
163+
flow/util/detail/linked_hash.hpp
162164
flow/util/detail/sched_task_handle_state.hpp
163165
flow/util/detail/util.hpp
164166
flow/util/detail/util_fwd.hpp
@@ -171,6 +173,7 @@ set(HDRS
171173
flow/util/shared_ptr_alias_holder.hpp
172174
flow/util/string_ostream.hpp
173175
flow/util/string_view.hpp
176+
flow/util/thread_lcl.hpp
174177
flow/util/traits.hpp
175178
flow/util/uniq_id_holder.hpp
176179
flow/util/util.hpp

src/flow/async/async_fwd.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ using Task = Function<void ()>;
104104
* In addition, it is guaranteed that copying (via constructor or assignment) of async::Op is
105105
* has performance characteristics no worse than those of `shared_ptr`. I.e., it is to be thought of as light-weight.
106106
*
107-
* The value `Op()` is designated as a null/sentinel value and must not be passed to Concurrent_task_loop::post()
107+
* The value `Op{}` is designated as a null/sentinel value and must not be passed to Concurrent_task_loop::post()
108108
* or anything built on it.
109109
*
110110
* That's the formal definition. We reiterate that copying these is cheap; and moreover two `Op`s such that

src/flow/async/concurrent_task_loop.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,9 @@ void optimize_pinning_in_thread_pool(log::Logger* logger_ptr,
210210
const auto native_mach_thread_id = pthread_mach_thread_np(native_pthread_thread_id);
211211
if (native_pthread_thread_id == 0)
212212
{
213-
const Error_code sys_err_code(errno, system_category()); // As above....
213+
const Error_code sys_err_code{errno, system_category()}; // As above....
214214
FLOW_ERROR_SYS_ERROR_LOG_WARNING();
215-
throw error::Runtime_error(sys_err_code, "pthread_mach_thread_np() call in optimize_pinning_in_thread_pool()");
215+
throw error::Runtime_error{sys_err_code, "pthread_mach_thread_np() call in optimize_pinning_in_thread_pool()"};
216216
}
217217
// else
218218
FLOW_LOG_TRACE("pthread ID [" << native_pthread_thread_id << "] "
@@ -249,8 +249,8 @@ void optimize_pinning_in_thread_pool(log::Logger* logger_ptr,
249249
* @todo For sure though should use error::Runtime_error here, the ctor that takes no Error_code.
250250
* That ctor did not exist when the present code was written; as of this writing Flow is Linux-only.
251251
* Would do it right now but lack the time to verify any changes for Mac at the moment. */
252-
throw runtime_error(ostream_op_string("[MACH_KERN_RETURN_T:", code,
253-
"] [thread_policy_set(THREAD_AFFINITY_POLICY) failed]"));
252+
throw runtime_error{ostream_op_string("[MACH_KERN_RETURN_T:", code,
253+
"] [thread_policy_set(THREAD_AFFINITY_POLICY) failed]")};
254254
}
255255
// else OK!
256256
# endif // if 0

src/flow/async/concurrent_task_loop.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,11 @@ namespace flow::async
208208
* flow::async::Concurrent_task_loop L;
209209
* auto op J = L.create_op(); // ATTN! The syntax is different from Strands but the idea is identical.
210210
* ...
211-
* X_type X(L.task_engine());
211+
* X_type X{L.task_engine()};
212212
* // ATTN! The syntax is again somewhat different from bind_executor(S, F), but the idea is equivalent.
213213
* X.async_A(&A_target, A_settings, flow::async::asio_handler_via_op(&L, J, F));
214214
* ...
215-
* Y_type Y(L.task_engine());
215+
* Y_type Y{L.task_engine()};
216216
* Y.async_B(&B_target, B_settings, flow::async::asio_handler_via_op(&L, J, G));
217217
* // X.sync_A() and Y.sync_B() are executing in background; F and G will run on respective completion;
218218
* // but F() and G() shall run non-concurrently by virtue of being wrapped by the same Op: J.
@@ -408,8 +408,8 @@ class Concurrent_task_loop :
408408
* in each thread, for all `thread_idx` in [0, n_threads()). start() will return no sooner than
409409
* when each such callback has finished.
410410
*/
411-
virtual void start(Task&& init_task_or_empty = Task(),
412-
const Thread_init_func& thread_init_func_or_empty = Thread_init_func()) = 0;
411+
virtual void start(Task&& init_task_or_empty = Task{},
412+
const Thread_init_func& thread_init_func_or_empty = Thread_init_func{}) = 0;
413413

414414
/**
415415
* Waits for any ongoing task(s)/completion handler(s) to return; then prevents any further-queued such tasks
@@ -535,7 +535,7 @@ class Concurrent_task_loop :
535535
*
536536
* @param op
537537
* The (presumably) multi-async-step operation to which `task` belongs, such that no `Task`s associated with
538-
* `op` may execute concurrently with `task`. If `op.empty()` (a/k/a `op == Op()`, recalling that `Op()`
538+
* `op` may execute concurrently with `task`. If `op.empty()` (a/k/a `op == Op{}`, recalling that `Op{}`
539539
* is null/sentinel), then `assert()` trips.
540540
* @param task
541541
* See other post().

src/flow/async/detail/task_qing_thread.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ Task_qing_thread::Task_qing_thread(flow::log::Logger* logger_ptr, util::String_v
5858
using Log_config = log::Config;
5959

6060
assert(m_task_engine);
61-
string nickname(nickname_view); // We need an std::string below anyway, so copy this now.
61+
string nickname{nickname_view}; // We need an std::string below anyway, so copy this now.
6262

6363
// Some programs start tons of threads. Let's be stingy with INFO messages.
6464

@@ -93,7 +93,7 @@ Task_qing_thread::Task_qing_thread(flow::log::Logger* logger_ptr, util::String_v
9393
* `sev_override == Sev::S_END_SENTINEL`; we need not even track it as a special case.) */
9494
const auto sev_override = *(Log_config::this_thread_verbosity_override());
9595

96-
m_worker_thread.reset(new Thread([this, // Valid throughout thread { body }.
96+
m_worker_thread.reset(new Thread{[this, // Valid throughout thread { body }.
9797
sev_override,
9898
nickname = std::move(nickname), // Valid throughout thread { body }.
9999
init_func_or_empty = std::move(init_func_or_empty),
@@ -148,7 +148,7 @@ Task_qing_thread::Task_qing_thread(flow::log::Logger* logger_ptr, util::String_v
148148
} // const auto sev_override_auto = // Restore logging to normal (how it normally is at thread start).
149149

150150
// Avoid loop, thread exiting when no pending tasks remain.
151-
Task_engine_work avoid_task_engine_stop(make_work_guard(*m_task_engine));
151+
Task_engine_work avoid_task_engine_stop{make_work_guard(*m_task_engine)};
152152

153153
// Block -- wait for tasks to be posted on this thread's (possibly shared with other threads) Task_engine.
154154
m_task_engine->run();
@@ -230,7 +230,7 @@ Task_qing_thread::Task_qing_thread(flow::log::Logger* logger_ptr, util::String_v
230230
* trace to the logs as well.) The answer is yes, though it's not on us to do it. One should do such work either
231231
* in std::terminate() (by using std::set_terminate()) or, arguably even better, in a global SIGABRT handler.
232232
* I am only mentioning it here as opportunistic advice -- again, it's not in our purview, as shown above. */
233-
})); // Thread body.
233+
}}); // Thread body.
234234
// `nickname`, `init_task_or_empty` may now be hosed.
235235

236236
if (done_promise_else_block)

src/flow/async/detail/task_qing_thread.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ class Task_qing_thread :
176176
*/
177177
explicit Task_qing_thread(flow::log::Logger* logger_ptr, util::String_view nickname,
178178
const Task_engine_ptr& task_engine, bool own_task_engine,
179-
boost::promise<void>* done_promise_else_block = 0,
180-
Task&& init_func_or_empty = Task());
179+
boost::promise<void>* done_promise_else_block = nullptr,
180+
Task&& init_func_or_empty = Task{});
181181

182182
/**
183183
* stop(), followed by forgetting the `Task_engine` returned by task_engine(); the latter action may

src/flow/async/op.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class Op_list :
114114
* @return See above. Note the address stored in the returned *reference* is valid until destructor runs;
115115
* hence it's not necessary (though cheap) to copy the `Op`.
116116
*/
117-
const Op& random_op(size_t* chosen_idx = 0) const;
117+
const Op& random_op(size_t* chosen_idx = nullptr) const;
118118

119119
/**
120120
* Returns a randomly selected index from range [O, size()).

src/flow/async/segregated_thread_task_loop.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ Segregated_thread_task_loop::Segregated_thread_task_loop(log::Logger* logger_ptr
5858
for (Task_engine_ptr& task_engine_ptr_in_container : m_task_engines)
5959
{
6060
// Attn: The concurrency-hint=1 may avoid or all most locking in boost.asio. Exactly 1 thread in the Task_engine.
61-
task_engine_ptr_in_container.reset(new Task_engine(1));
61+
task_engine_ptr_in_container.reset(new Task_engine{1});
6262

6363
/* Task_engine starts in !stopped() mode ready to run(). start() pre-condition is stopped() so for simplicity
6464
* start in the same state that our stop() would put the Task_engine into: */
@@ -68,9 +68,9 @@ Segregated_thread_task_loop::Segregated_thread_task_loop(log::Logger* logger_ptr
6868

6969
// Initialize our Ops_list of pre-created Ops which in our case simply store all `n` `Task_engine_ptr`s.
7070
const size_t n = n_threads();
71-
m_per_thread_ops.reset(new Op_list(get_logger(), n,
71+
m_per_thread_ops.reset(new Op_list{get_logger(), n,
7272
[this](size_t idx) -> Op
73-
{ return Op(static_cast<Task_engine_ptr>(m_task_engines[idx])); }));
73+
{ return Op{static_cast<Task_engine_ptr>(m_task_engines[idx])}; }});
7474
/* (The static_cast<> is probably unnecessary but makes the compiler check our type logic for us. That's quite
7575
* helpful in this rare situation where we're essentially using a dynamically typed variable in C++ [boost::any].
7676
* There is 0 perf cost to it by the way.) */
@@ -124,7 +124,7 @@ void Segregated_thread_task_loop::start(Task&& init_task_or_empty,
124124
* though, so let's keep to the letter of our contract. Also, this way we can do it in parallel instead of
125125
* serially. */
126126

127-
vector<promise<void>> thread_init_done_promises(n);
127+
vector<promise<void>> thread_init_done_promises{n};
128128
for (size_t idx = 0; idx != n; ++idx)
129129
{
130130
Task task_qing_thread_init_func;
@@ -147,11 +147,11 @@ void Segregated_thread_task_loop::start(Task&& init_task_or_empty,
147147
// Now its Task_qing_thread can do ->run() as most of its thread body (and it won't just return).
148148

149149
// Create/start the thread.
150-
m_qing_threads[idx].reset(new Task_qing_thread(get_logger(),
150+
m_qing_threads[idx].reset(new Task_qing_thread{get_logger(),
151151
(n == 1) ? m_nickname : util::ostream_op_string(m_nickname, idx),
152152
task_engine, true, // Its *own* 1-1 Task_engine.
153153
&(thread_init_done_promises[idx]),
154-
std::move(task_qing_thread_init_func)));
154+
std::move(task_qing_thread_init_func)});
155155
} // for (idx in [0, n))
156156
FLOW_LOG_INFO("All threads are asynchronously starting. Awaiting their readiness barrier-style, in sequence.");
157157
for (size_t idx = 0; idx != n; ++idx)
@@ -167,7 +167,7 @@ void Segregated_thread_task_loop::start(Task&& init_task_or_empty,
167167
{
168168
FLOW_LOG_INFO("Thread count was auto-determined. Further attempting thread-to-core scheduling optimization.");
169169

170-
vector<Thread*> worker_threads(n); // Initialized to nulls. Now set them to the raw `Thread*`s.
170+
vector<Thread*> worker_threads{n}; // Initialized to nulls. Now set them to the raw `Thread*`s.
171171
transform(m_qing_threads.begin(), m_qing_threads.end(), worker_threads.begin(),
172172
[](const Task_qing_thread_ptr& qing_thread_ptr) -> Thread*
173173
{ return qing_thread_ptr->raw_worker_thread(); });

src/flow/async/segregated_thread_task_loop.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ class Segregated_thread_task_loop :
114114
* @param thread_init_func_or_empty
115115
* See superclass API.
116116
*/
117-
void start(Task&& init_task_or_empty = Task(),
118-
const Thread_init_func& thread_init_func_or_empty = Thread_init_func()) override;
117+
void start(Task&& init_task_or_empty = Task{},
118+
const Thread_init_func& thread_init_func_or_empty = Thread_init_func{}) override;
119119

120120
/**
121121
* Implements superclass API. In this implementation this essentially boils down to N `Task_engine::stop()`s,

0 commit comments

Comments
 (0)