Skip to content

Commit 77265a8

Browse files
author
Laxman Dhulipala
committed
Introduce gbbs::parallel_for_with_status, which is similar to parlay::parallel_for, but handles error statuses. (linhares)
1 parent 1422c5b commit 77265a8

File tree

5 files changed

+245
-6
lines changed

5 files changed

+245
-6
lines changed

MODULE.bazel

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,4 @@ bazel_dep(
1515
)
1616

1717
bazel_dep(name = "abseil-cpp", version = "20240116.2")
18-
19-
# -- bazel_dep definitions -- #
20-
21-
# -- use_repo_rule statements -- #
22-
23-
# -- repo definitions -- #
18+
bazel_dep(name = "google_benchmark", version = "1.9.4")

MODULE.bazel.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gbbs/helpers/BUILD

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,35 @@ cc_library(
5656
],
5757
)
5858

59+
cc_library(
60+
name = "parallel_for_with_status",
61+
hdrs = ["parallel_for_with_status.h"],
62+
deps = [
63+
"//gbbs:bridge",
64+
"@abseil-cpp//absl/status",
65+
"@parlaylib//parlay:parallel",
66+
],
67+
)
68+
69+
gbbs_cc_test(
70+
name = "parallel_for_with_status_test",
71+
srcs = ["parallel_for_with_status_test.cc"],
72+
deps = [
73+
":parallel_for_with_status",
74+
":status_macros",
75+
"@googletest//:gtest_main",
76+
"@abseil-cpp//absl/base:core_headers",
77+
"@abseil-cpp//absl/log",
78+
"@abseil-cpp//absl/log:absl_log",
79+
"@abseil-cpp//absl/log:check",
80+
"@abseil-cpp//absl/status",
81+
"@abseil-cpp//absl/strings",
82+
"@abseil-cpp//absl/synchronization",
83+
"@google_benchmark//:benchmark",
84+
"@parlaylib//parlay:parallel",
85+
],
86+
)
87+
5988
cc_library(
6089
name = "parse_command_line",
6190
hdrs = ["parse_command_line.h"],
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#ifndef THIRD_PARTY_GBBS_GBBS_HELPERS_PARALLEL_FOR_WITH_STATUS_H_
2+
#define THIRD_PARTY_GBBS_GBBS_HELPERS_PARALLEL_FOR_WITH_STATUS_H_
3+
4+
#include <cstddef>
5+
6+
#include "absl/status/status.h"
7+
#include "gbbs/bridge.h"
8+
#include "parlay/parallel.h"
9+
10+
namespace gbbs {
11+
12+
// Similar to `parlay::parallel_for`, with the following differences:
13+
// - `f` is expected to return an `absl::Status` instead of `void`.
14+
// - `parallel_for_with_status` returns an `absl::Status` instead of `void`. The
15+
// returned status is one of the error statuses produced by the calls to `f`,
16+
// if any, or an Ok status otherwise.
17+
template <typename F>
18+
absl::Status parallel_for_with_status(
19+
size_t start, size_t end, F&& f,
20+
// We use `long` for consistency with `parlay::parallel_for`.
21+
// NOLINTBEGIN(runtime/int)
22+
// NOLINTBEGIN(google-runtime-int)
23+
long granularity = 0,
24+
// NOLINTEND(google-runtime-int)
25+
// NOLINTEND(runtime/int)
26+
bool conservative = false) {
27+
static_assert(std::is_invocable_v<F&, size_t>);
28+
absl::Status overall_status;
29+
bool error_found = false;
30+
parlay::parallel_for(
31+
start, end,
32+
[&](size_t i) {
33+
if (absl::Status status = f(i);
34+
!status.ok() && gbbs::atomic_compare_and_swap(&error_found, false,
35+
true)) [[unlikely]] {
36+
overall_status = status;
37+
}
38+
},
39+
granularity, conservative);
40+
return overall_status;
41+
}
42+
43+
} // namespace gbbs
44+
45+
#endif // THIRD_PARTY_GBBS_GBBS_HELPERS_PARALLEL_FOR_WITH_STATUS_H_
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#include "gbbs/helpers/parallel_for_with_status.h"
2+
3+
#include <cstddef>
4+
#include <vector>
5+
6+
#include "benchmark/benchmark.h"
7+
#include "gmock/gmock.h"
8+
#include "gtest/gtest.h"
9+
#include "status_macros.h"
10+
#include "absl/base/thread_annotations.h"
11+
#include "absl/log/absl_log.h"
12+
#include "absl/log/check.h"
13+
#include "absl/log/log.h"
14+
#include "absl/status/status.h"
15+
#include "absl/strings/str_cat.h"
16+
#include "absl/synchronization/mutex.h"
17+
#include "parlay/parallel.h"
18+
19+
namespace gbbs {
20+
namespace {
21+
22+
using ::testing::AnyOf;
23+
using ::testing::UnorderedElementsAre;
24+
25+
class ThreadSafeCounter {
26+
public:
27+
void Increment() {
28+
absl::MutexLock lock(&mutex_);
29+
++value_;
30+
}
31+
32+
int Value() {
33+
absl::MutexLock lock(&mutex_);
34+
return value_;
35+
}
36+
37+
private:
38+
absl::Mutex mutex_;
39+
int value_ ABSL_GUARDED_BY(mutex_) = 0;
40+
};
41+
42+
class ParallelForWithStatusTest : public ::testing::Test {
43+
// private:
44+
// graph_mining::in_memory::ParallelSchedulerReference scheduler_;
45+
};
46+
47+
TEST_F(ParallelForWithStatusTest, EmptyRange) {
48+
ThreadSafeCounter counter;
49+
EXPECT_OK(gbbs::parallel_for_with_status(5, 5, [&](size_t i) {
50+
counter.Increment();
51+
ABSL_LOG(FATAL) << "This function should not have been called, because the "
52+
"range is empty";
53+
return absl::OkStatus();
54+
}));
55+
EXPECT_EQ(counter.Value(), 0);
56+
}
57+
58+
TEST_F(ParallelForWithStatusTest, SingleCall_Ok) {
59+
ThreadSafeCounter counter;
60+
EXPECT_OK(gbbs::parallel_for_with_status(3, 4, [&](size_t i) {
61+
counter.Increment();
62+
return absl::OkStatus();
63+
}));
64+
EXPECT_EQ(counter.Value(), 1);
65+
}
66+
67+
TEST_F(ParallelForWithStatusTest, SingleCall_Error) {
68+
ThreadSafeCounter counter;
69+
EXPECT_THAT(
70+
gbbs::parallel_for_with_status(3, 4,
71+
[&](size_t i) {
72+
counter.Increment();
73+
return absl::NotFoundError("foo");
74+
}),
75+
StatusIs(absl::StatusCode::kNotFound, "foo"));
76+
EXPECT_EQ(counter.Value(), 1);
77+
}
78+
79+
TEST_F(ParallelForWithStatusTest, MultipleCalls_Ok) {
80+
absl::Mutex indices_seen_mutex;
81+
std::vector<size_t> indices_seen; // Guarded by `indices_seen_mutex`.
82+
EXPECT_OK(gbbs::parallel_for_with_status(5, 15, [&](size_t i) {
83+
{
84+
absl::MutexLock lock(&indices_seen_mutex);
85+
indices_seen.push_back(i);
86+
}
87+
return absl::OkStatus();
88+
}));
89+
EXPECT_THAT(indices_seen,
90+
UnorderedElementsAre(5, 6, 7, 8, 9, 10, 11, 12, 13, 14));
91+
}
92+
93+
TEST_F(ParallelForWithStatusTest, MultipleCalls_AllErrors) {
94+
ThreadSafeCounter counter;
95+
EXPECT_THAT(gbbs::parallel_for_with_status(1, 5,
96+
[&](size_t i) {
97+
counter.Increment();
98+
return absl::InternalError(
99+
absl::StrCat("foo_", i));
100+
}),
101+
StatusIs(absl::StatusCode::kInternal,
102+
AnyOf("foo_1", "foo_2", "foo_3", "foo_4")));
103+
EXPECT_EQ(counter.Value(), 4);
104+
}
105+
106+
TEST_F(ParallelForWithStatusTest, MultipleCalls_SingleError) {
107+
ThreadSafeCounter counter;
108+
EXPECT_THAT(gbbs::parallel_for_with_status(
109+
2, 8,
110+
[&](size_t i) {
111+
counter.Increment();
112+
if (i == 4) {
113+
return absl::FailedPreconditionError("foo");
114+
} else {
115+
return absl::OkStatus();
116+
}
117+
}),
118+
StatusIs(absl::StatusCode::kFailedPrecondition, "foo"));
119+
EXPECT_EQ(counter.Value(), 6);
120+
}
121+
122+
// Microbenchmarks comparing `parlay::parallel_for` and
123+
// `gbbs::parallel_for_with_status`, in the case where `f` does a tiny amount
124+
// of work (in which case the overhead of handling statuses might be
125+
// non-negligible), and `f` always returns an OK status (which is the most
126+
// common case).
127+
//
128+
// To run these microbenchmarks, use:
129+
/*
130+
blaze --blazerc=/dev/null build --config=benchmark \
131+
//gbbs/helpers:parallel_for_with_status_test
132+
133+
blaze-bin/gbbs/helpers/parallel_for_with_status_test \
134+
--benchmark_filter=all --benchmark_min_time=60s
135+
*/
136+
137+
static void BM_ParallelForPlain(benchmark::State& state) {
138+
// graph_mining::in_memory::ParallelSchedulerReference scheduler;
139+
int n = state.range(0);
140+
std::vector<int> v(n);
141+
for (auto s : state) {
142+
benchmark::DoNotOptimize(v);
143+
parlay::parallel_for(0, n, [&](size_t i) { v[i] = i; });
144+
}
145+
}
146+
147+
BENCHMARK(BM_ParallelForPlain)->Arg(1'000'000)->Arg(10'000'000);
148+
149+
static void BM_ParallelForWithStatus(benchmark::State& state) {
150+
// graph_mining::in_memory::ParallelSchedulerReference scheduler;
151+
int n = state.range(0);
152+
std::vector<int> v(n);
153+
for (auto s : state) {
154+
benchmark::DoNotOptimize(v);
155+
EXPECT_OK(gbbs::parallel_for_with_status(0, n, [&](size_t i) {
156+
v[i] = i;
157+
return absl::OkStatus();
158+
}));
159+
}
160+
}
161+
162+
BENCHMARK(BM_ParallelForWithStatus)->Arg(1'000'000)->Arg(10'000'000);
163+
164+
} // namespace
165+
} // namespace gbbs

0 commit comments

Comments
 (0)