Skip to content

Commit df01b19

Browse files
authored
Support generics for ConcurrentAdder, ConcurrentMaxer and ConcurrentMiner (baidu#96)
1 parent b2f26b4 commit df01b19

3 files changed

Lines changed: 291 additions & 119 deletions

File tree

src/babylon/concurrent/counter.cpp

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,52 +4,6 @@
44

55
BABYLON_NAMESPACE_BEGIN
66

7-
////////////////////////////////////////////////////////////////////////////////
8-
// ConcurrentAdder begin
9-
ssize_t ConcurrentAdder::value() const noexcept {
10-
ssize_t sum = 0;
11-
_storage.for_each([&](const ssize_t& value) {
12-
sum += value;
13-
});
14-
return sum;
15-
}
16-
17-
void ConcurrentAdder::reset() noexcept {
18-
_storage.for_each([&](ssize_t& value) {
19-
value = 0;
20-
});
21-
}
22-
// ConcurrentAdder end
23-
////////////////////////////////////////////////////////////////////////////////
24-
25-
////////////////////////////////////////////////////////////////////////////////
26-
// ConcurrentMaxer begin
27-
ssize_t ConcurrentMaxer::value() const noexcept {
28-
ssize_t max_value = 0;
29-
value(max_value);
30-
return max_value;
31-
}
32-
33-
bool ConcurrentMaxer::value(ssize_t& max_value) const noexcept {
34-
bool has_result = false;
35-
ssize_t result = ::std::numeric_limits<ssize_t>::min();
36-
_storage.for_each([&](const Slot& slot) {
37-
if (slot.version == _version) {
38-
if (slot.value > result) {
39-
result = slot.value;
40-
has_result = true;
41-
}
42-
}
43-
});
44-
if (has_result) {
45-
max_value = result;
46-
}
47-
return has_result;
48-
}
49-
50-
void ConcurrentMaxer::reset() noexcept {
51-
++_version;
52-
}
537
// ConcurrentMaxer end
548
////////////////////////////////////////////////////////////////////////////////
559

src/babylon/concurrent/counter.h

Lines changed: 135 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <type_traits> // std::is_integral, std::is_floating_point
34
#include "babylon/concurrent/thread_local.h" // CompactEnumerableThreadLocal
45
#include "babylon/environment.h"
56

@@ -21,73 +22,178 @@ BABYLON_NAMESPACE_BEGIN
2122
// 实现上针对多写少读的场景做了优化,更适用于典型的计数场景
2223
// 计数操作改为独立发生在对应的线程局部数据上,避免了缓存行竞争
2324
// 但相应的读操作就需要遍历所有线程局部数据并再次累加才能得到结果
24-
class ConcurrentAdder {
25+
template <typename T>
26+
class GenericsConcurrentAdder {
27+
static_assert(
28+
(::std::is_integral<T>::value ||
29+
::std::is_floating_point<T>::value) && 8 >= sizeof(T),
30+
"ConcurrentSummer only supports integral or "
31+
"floating point types with size <= 8 bytes");
32+
2533
public:
26-
ConcurrentAdder() noexcept = default;
27-
ConcurrentAdder(ConcurrentAdder&&) noexcept = default;
28-
ConcurrentAdder(const ConcurrentAdder&) = delete;
29-
ConcurrentAdder& operator=(ConcurrentAdder&&) noexcept = default;
30-
ConcurrentAdder& operator=(const ConcurrentAdder&) = delete;
31-
~ConcurrentAdder() noexcept = default;
34+
GenericsConcurrentAdder() noexcept = default;
35+
GenericsConcurrentAdder(GenericsConcurrentAdder&&) noexcept = default;
36+
GenericsConcurrentAdder(const GenericsConcurrentAdder&) = delete;
37+
GenericsConcurrentAdder& operator=(GenericsConcurrentAdder&&) noexcept = default;
38+
GenericsConcurrentAdder& operator=(const GenericsConcurrentAdder&) = delete;
39+
~GenericsConcurrentAdder() noexcept = default;
3240

3341
// 分散计数接口
34-
template <typename T>
35-
inline ConcurrentAdder& operator<<(const T& value) noexcept;
42+
template <typename U>
43+
GenericsConcurrentAdder& operator<<(const U& value) noexcept {
44+
count(static_cast<T>(value));
45+
return *this;
46+
}
3647

3748
// 汇聚读取接口
38-
ssize_t value() const noexcept;
49+
T value() const noexcept {
50+
T sum = 0;
51+
_storage.for_each([&](const T& value) {
52+
sum += value;
53+
});
54+
return sum;
55+
}
3956

4057
// 重置接口
41-
void reset() noexcept;
58+
void reset() noexcept {
59+
_storage.for_each([&](T& value) {
60+
value = 0;
61+
});
62+
}
4263

4364
private:
44-
inline void count(ssize_t value) noexcept;
65+
void count(T value) noexcept {
66+
auto& local = _storage.local();
67+
// local的唯一修改者是自己,所以这里不需要使用原子加法
68+
// 正确对齐的数值类型赋值本身是原子发生的
69+
local = local + value;
70+
}
4571

46-
CompactEnumerableThreadLocal<ssize_t, 64, true> _storage;
72+
CompactEnumerableThreadLocal<T, 64, true> _storage;
4773
};
4874

49-
// 高并发最大值计数器
75+
using ConcurrentAdder = GenericsConcurrentAdder<ssize_t>;
76+
77+
// 高并发最大/小值计数器
5078
// 原理上等价于利用std::atomic
51-
// 计数操作进行loop cas(old, max(new, old))
79+
// 计数操作进行loop cas(old, max(new, old)) / cas(old, min(new, old))
5280
// 读取&重置操作进行exchange(0)
5381
//
5482
// 实现上针对多写少读的场景做了优化,更适用于典型的计数场景
5583
// 计数操作改为独立发生在对应的线程局部数据上,避免了缓存行竞争
5684
// 但相应的读操作就需要遍历所有线程局部数据并再次累加才能得到结果
57-
class ConcurrentMaxer {
85+
namespace internal {
86+
template <typename T>
87+
struct MaxComparer {
88+
bool operator()(T lhs, T rhs) const {
89+
return lhs > rhs;
90+
}
91+
};
92+
template <typename T>
93+
struct MinComparer {
94+
bool operator()(T lhs, T rhs) const {
95+
return lhs < rhs;
96+
}
97+
};
98+
99+
template <typename T, bool Max>
100+
class ConcurrentComparer {
101+
static_assert(
102+
(::std::is_integral<T>::value ||
103+
::std::is_floating_point<T>::value) && 8 >= sizeof(T),
104+
"ConcurrentSummer only supports integral or "
105+
"floating point types with size <= 8 bytes");
106+
107+
constexpr static T EXTREMUM =
108+
Max ? std::numeric_limits<T>::min() : std::numeric_limits<T>::max();
109+
typedef typename std::conditional<
110+
Max, MaxComparer<T>, MinComparer<T>>::type Comparer;
111+
58112
public:
59-
ConcurrentMaxer() noexcept = default;
60-
ConcurrentMaxer(ConcurrentMaxer&&) = delete;
61-
ConcurrentMaxer(const ConcurrentMaxer&) = delete;
62-
ConcurrentMaxer& operator=(ConcurrentMaxer&&) = delete;
63-
ConcurrentMaxer& operator=(const ConcurrentMaxer&) = delete;
64-
~ConcurrentMaxer() noexcept = default;
113+
ConcurrentComparer() noexcept = default;
114+
ConcurrentComparer(ConcurrentComparer&&) = delete;
115+
ConcurrentComparer(const ConcurrentComparer&) = delete;
116+
ConcurrentComparer& operator=(ConcurrentComparer&&) = delete;
117+
ConcurrentComparer& operator=(const ConcurrentComparer&) = delete;
118+
~ConcurrentComparer() noexcept = default;
65119

66120
// 分散计数接口
67-
inline ConcurrentMaxer& operator<<(ssize_t value) noexcept;
121+
ConcurrentComparer& operator<<(T value) noexcept {
122+
auto& local = _storage.local();
123+
// 为了避免原子操作引入内存屏障,这里采用异步版本感知机制
124+
// 取代了cas动作来实现重置效果
125+
//
126+
// 理论上存在一个缺陷,即
127+
// 1、刚刚完成一次汇聚读取,正在进行版本推进
128+
// 2、此时进行中的计数动作依然计入了上一个版本
129+
// 3、下一个汇聚读取周期中,这些样本无法计入
130+
// 不过对于统计场景,这个影响非常微弱,可忽略不计
131+
if (ABSL_PREDICT_FALSE(_version != local.version)) {
132+
local.version = _version;
133+
local.value = value;
134+
return *this;
135+
}
136+
if (ABSL_PREDICT_FALSE(_comparer(value, local.value))) {
137+
local.value = value;
138+
}
139+
return *this;
140+
}
68141

69142
// 汇聚读取接口
70143
// 如果周期内有计数发生,返回计数最大值
71144
// 如果周期内没有计数发生,返回0
72-
ssize_t value() const noexcept;
145+
T value() const noexcept {
146+
T compare_value = 0;
147+
value(compare_value);
148+
return compare_value;
149+
}
73150

74151
// 汇聚读取接口
75-
// 如果周期内有计数发生,返回true,并将max_value填充为计数最大值
76-
// 如果周期内没有计数发生,返回false,此时不修改max_value的值
77-
bool value(ssize_t& max_value) const noexcept;
152+
// 如果周期内有计数发生,返回true,并将compare_value填充为计数最大值
153+
// 如果周期内没有计数发生,返回false,此时不修改compare_value的值
154+
bool value(T& compare_value) const noexcept {
155+
bool has_result = false;
156+
T result = EXTREMUM;
157+
_storage.for_each([&](const Slot& slot) {
158+
if (slot.version == _version) {
159+
if (_comparer(slot.value, result)) {
160+
result = slot.value;
161+
has_result = true;
162+
}
163+
}
164+
});
165+
if (has_result) {
166+
compare_value = result;
167+
}
168+
return has_result;
169+
}
78170

79171
// 重置接口,开启一个新的周期
80-
void reset() noexcept;
172+
void reset() noexcept {
173+
++_version;
174+
}
81175

82176
private:
83177
struct Slot {
84178
size_t version {SIZE_MAX};
85-
ssize_t value;
179+
T value;
86180
};
87181

88182
CompactEnumerableThreadLocal<Slot, 64, true> _storage;
89183
size_t _version {0};
184+
Comparer _comparer;
90185
};
186+
} // namespace internal
187+
188+
template <typename T>
189+
class GenericsConcurrentMaxer : public internal::ConcurrentComparer<T, true> {};
190+
191+
using ConcurrentMaxer = GenericsConcurrentMaxer<ssize_t>;
192+
193+
template <typename T>
194+
class GenericsConcurrentMiner : public internal::ConcurrentComparer<T, false> {};
195+
196+
using ConcurrentMiner = GenericsConcurrentMiner<ssize_t>;
91197

92198
// 高并发求和计数器
93199
// 原理上等价于利用锁同步
@@ -207,43 +313,6 @@ class ConcurrentSampler {
207313
::std::atomic<uint32_t> _version {0};
208314
};
209315

210-
template <typename T>
211-
inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentAdder&
212-
ConcurrentAdder::operator<<(const T& value) noexcept {
213-
count(static_cast<ssize_t>(value));
214-
return *this;
215-
}
216-
217-
inline ABSL_ATTRIBUTE_ALWAYS_INLINE void ConcurrentAdder::count(
218-
ssize_t value) noexcept {
219-
auto& local = _storage.local();
220-
// local的唯一修改者是自己,所以这里不需要使用原子加法
221-
// 正确对齐的数值类型赋值本身是原子发生的
222-
local = local + value;
223-
}
224-
225-
inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentMaxer&
226-
ConcurrentMaxer::operator<<(ssize_t value) noexcept {
227-
auto& local = _storage.local();
228-
// 为了避免原子操作引入内存屏障,这里采用异步版本感知机制
229-
// 取代了cas动作来实现重置效果
230-
//
231-
// 理论上存在一个缺陷,即
232-
// 1、刚刚完成一次汇聚读取,正在进行版本推进
233-
// 2、此时进行中的计数动作依然计入了上一个版本
234-
// 3、下一个汇聚读取周期中,这些样本无法计入
235-
// 不过对于统计场景,这个影响非常微弱,可忽略不计
236-
if (ABSL_PREDICT_FALSE(_version != local.version)) {
237-
local.version = _version;
238-
local.value = value;
239-
return *this;
240-
}
241-
if (ABSL_PREDICT_FALSE(value > local.value)) {
242-
local.value = value;
243-
}
244-
return *this;
245-
}
246-
247316
inline ABSL_ATTRIBUTE_ALWAYS_INLINE ConcurrentSummer&
248317
ConcurrentSummer::operator<<(ssize_t value) noexcept {
249318
return operator<<({value, 1});

0 commit comments

Comments
 (0)