Skip to content

Commit 4f7dbf8

Browse files
authored
fix deadlock in TimeoutFlush (#2158)
* fix deadlock in TimeoutFlush
1 parent 6e8ab28 commit 4f7dbf8

File tree

5 files changed

+76
-28
lines changed

5 files changed

+76
-28
lines changed

Diff for: core/collection_pipeline/CollectionPipeline.cpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ bool CollectionPipeline::Init(CollectionConfig&& config) {
359359
}
360360

361361
void CollectionPipeline::Start() {
362-
// #ifndef APSARA_UNIT_TEST_MAIN
362+
TimeoutFlushManager::GetInstance()->RegisterFlushers(mName, mFlushers);
363363
// TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里
364364
for (const auto& flusher : mFlushers) {
365365
flusher->Start();
@@ -381,7 +381,6 @@ void CollectionPipeline::Start() {
381381

382382
SET_GAUGE(mStartTime,
383383
chrono::duration_cast<chrono::seconds>(chrono::system_clock::now().time_since_epoch()).count());
384-
// #endif
385384
LOG_INFO(sLogger, ("pipeline start", "succeeded")("config", mName));
386385
}
387386

@@ -439,7 +438,7 @@ bool CollectionPipeline::FlushBatch() {
439438
for (auto& flusher : mFlushers) {
440439
allSucceeded = flusher->FlushAll() && allSucceeded;
441440
}
442-
TimeoutFlushManager::GetInstance()->ClearRecords(mName);
441+
TimeoutFlushManager::GetInstance()->UnregisterFlushers(mName, mFlushers);
443442
return allSucceeded;
444443
}
445444

Diff for: core/collection_pipeline/batch/TimeoutFlushManager.cpp

+41-16
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace logtail {
2020

2121
void TimeoutFlushManager::UpdateRecord(
2222
const string& config, size_t index, size_t key, uint32_t timeoutSecs, Flusher* f) {
23-
lock_guard<recursive_mutex> lock(mMux);
23+
lock_guard<mutex> lock(mTimeoutRecordsMux);
2424
auto& item = mTimeoutRecords[config];
2525
auto it = item.find({index, key});
2626
if (it == item.end()) {
@@ -31,27 +31,52 @@ void TimeoutFlushManager::UpdateRecord(
3131
}
3232

3333
void TimeoutFlushManager::FlushTimeoutBatch() {
34-
lock_guard<recursive_mutex> lock(mMux);
35-
vector<pair<Flusher*, size_t>> records;
36-
for (auto& item : mTimeoutRecords) {
37-
for (auto it = item.second.begin(); it != item.second.end();) {
38-
if (time(nullptr) - it->second.mUpdateTime >= it->second.mTimeoutSecs) {
39-
// cannot flush here, since flush may also update record, which might invalidate map iterator
40-
records.emplace_back(it->second.mFlusher, it->second.mKey);
41-
it = item.second.erase(it);
42-
} else {
43-
++it;
34+
multimap<string, pair<Flusher*, size_t>> records;
35+
{
36+
lock_guard<mutex> lock(mTimeoutRecordsMux);
37+
for (auto& item : mTimeoutRecords) {
38+
for (auto it = item.second.begin(); it != item.second.end();) {
39+
if (time(nullptr) - it->second.mUpdateTime >= it->second.mTimeoutSecs) {
40+
// cannot flush here, since flush may also update record, which might invalidate map iterator and
41+
// lead to deadlock
42+
records.emplace(item.first, make_pair(it->second.mFlusher, it->second.mKey));
43+
it = item.second.erase(it);
44+
} else {
45+
++it;
46+
}
4447
}
4548
}
4649
}
47-
for (auto& item : records) {
48-
item.first->Flush(item.second);
50+
{
51+
lock_guard<mutex> lock(mDeletedFlushersMux);
52+
for (auto& item : records) {
53+
if (mDeletedFlushers.find(make_pair(item.first, item.second.first)) == mDeletedFlushers.end()) {
54+
item.second.first->Flush(item.second.second);
55+
}
56+
}
57+
mDeletedFlushers.clear();
58+
}
59+
}
60+
61+
void TimeoutFlushManager::UnregisterFlushers(const string& config,
62+
const vector<unique_ptr<FlusherInstance>>& flushers) {
63+
{
64+
lock_guard<mutex> lock(mTimeoutRecordsMux);
65+
mTimeoutRecords.erase(config);
66+
}
67+
{
68+
lock_guard<mutex> lock(mDeletedFlushersMux);
69+
for (const auto& flusher : flushers) {
70+
mDeletedFlushers.emplace(make_pair(config, flusher->GetPlugin()));
71+
}
4972
}
5073
}
5174

52-
void TimeoutFlushManager::ClearRecords(const string& config) {
53-
lock_guard<recursive_mutex> lock(mMux);
54-
mTimeoutRecords.erase(config);
75+
void TimeoutFlushManager::RegisterFlushers(const string& config, const vector<unique_ptr<FlusherInstance>>& flushers) {
76+
lock_guard<mutex> lock(mDeletedFlushersMux);
77+
for (const auto& flusher : flushers) {
78+
mDeletedFlushers.erase(make_pair(config, flusher->GetPlugin()));
79+
}
5580
}
5681

5782
} // namespace logtail

Diff for: core/collection_pipeline/batch/TimeoutFlushManager.h

+10-2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121

2222
#include <map>
2323
#include <mutex>
24+
#include <set>
2425
#include <string>
2526
#include <vector>
2627

28+
#include "collection_pipeline/plugin/instance/FlusherInstance.h"
2729
#include "collection_pipeline/plugin/interface/Flusher.h"
2830

2931
namespace logtail {
@@ -52,15 +54,21 @@ class TimeoutFlushManager {
5254

5355
void UpdateRecord(const std::string& config, size_t index, size_t key, uint32_t timeoutSecs, Flusher* f);
5456
void FlushTimeoutBatch();
55-
void ClearRecords(const std::string& config);
57+
void UnregisterFlushers(const std::string& config, const std::vector<std::unique_ptr<FlusherInstance>>& flushers);
58+
void RegisterFlushers(const std::string& config, const std::vector<std::unique_ptr<FlusherInstance>>& flushers);
5659

5760
private:
5861
TimeoutFlushManager() = default;
5962
~TimeoutFlushManager() = default;
6063

61-
std::recursive_mutex mMux;
64+
// visited by all processor runner threads
65+
mutable std::mutex mTimeoutRecordsMux;
6266
std::map<std::string, std::map<std::pair<size_t, size_t>, TimeoutRecord>> mTimeoutRecords;
6367

68+
// visited by main thread and num 0 processor runner thread
69+
mutable std::mutex mDeletedFlushersMux;
70+
std::set<std::pair<std::string, const Flusher*>> mDeletedFlushers;
71+
6472
#ifdef APSARA_UNIT_TEST_MAIN
6573
friend class PipelineUnittest;
6674
friend class TimeoutFlushManagerUnittest;

Diff for: core/unittest/batch/TimeoutFlushManagerUnittest.cpp

+17-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class TimeoutFlushManagerUnittest : public ::testing::Test {
2424
public:
2525
void TestUpdateRecord();
2626
void TestFlushTimeoutBatch();
27-
void TestClearRecords();
27+
void TestUnregisterFlushers();
2828

2929
protected:
3030
static void SetUpTestCase() {
@@ -77,16 +77,28 @@ void TimeoutFlushManagerUnittest::TestFlushTimeoutBatch() {
7777
APSARA_TEST_EQUAL(1U, TimeoutFlushManager::GetInstance()->mTimeoutRecords.size());
7878
}
7979

80-
void TimeoutFlushManagerUnittest::TestClearRecords() {
81-
TimeoutFlushManager::GetInstance()->UpdateRecord("test_config", 0, 1, 3, sFlusher.get());
82-
TimeoutFlushManager::GetInstance()->ClearRecords("test_config");
80+
void TimeoutFlushManagerUnittest::TestUnregisterFlushers() {
81+
auto flusher = new FlusherMock();
82+
flusher->SetContext(sCtx);
83+
flusher->SetMetricsRecordRef(FlusherMock::sName, "1");
84+
auto instance = unique_ptr<FlusherInstance>(new FlusherInstance(flusher, PluginInstance::PluginMeta("1")));
85+
vector<unique_ptr<FlusherInstance>> flushers;
86+
flushers.push_back(std::move(instance));
87+
88+
TimeoutFlushManager::GetInstance()->UpdateRecord("test_config", 0, 1, 3, flusher);
89+
TimeoutFlushManager::GetInstance()->UnregisterFlushers("test_config", flushers);
8390

91+
APSARA_TEST_EQUAL(1U, TimeoutFlushManager::GetInstance()->mDeletedFlushers.size());
8492
APSARA_TEST_EQUAL(0U, TimeoutFlushManager::GetInstance()->mTimeoutRecords.size());
93+
94+
TimeoutFlushManager::GetInstance()->FlushTimeoutBatch();
95+
APSARA_TEST_TRUE(TimeoutFlushManager::GetInstance()->mDeletedFlushers.empty());
96+
APSARA_TEST_TRUE(TimeoutFlushManager::GetInstance()->mTimeoutRecords.empty());
8597
}
8698

8799
UNIT_TEST_CASE(TimeoutFlushManagerUnittest, TestUpdateRecord)
88100
UNIT_TEST_CASE(TimeoutFlushManagerUnittest, TestFlushTimeoutBatch)
89-
UNIT_TEST_CASE(TimeoutFlushManagerUnittest, TestClearRecords)
101+
UNIT_TEST_CASE(TimeoutFlushManagerUnittest, TestUnregisterFlushers)
90102

91103
} // namespace logtail
92104

Diff for: core/unittest/pipeline/PipelineUnittest.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -2917,15 +2917,19 @@ void PipelineUnittest::TestFlushBatch() const {
29172917
TimeoutFlushManager::GetInstance()->UpdateRecord(configName, 0, 1, 3, nullptr);
29182918
TimeoutFlushManager::GetInstance()->UpdateRecord(configName, 1, 1, 3, nullptr);
29192919
APSARA_TEST_TRUE(pipeline.FlushBatch());
2920-
APSARA_TEST_TRUE(TimeoutFlushManager::GetInstance()->mTimeoutRecords.empty());
2920+
APSARA_TEST_EQUAL(0U, TimeoutFlushManager::GetInstance()->mTimeoutRecords.size());
2921+
APSARA_TEST_EQUAL(2U, TimeoutFlushManager::GetInstance()->mDeletedFlushers.size());
2922+
TimeoutFlushManager::GetInstance()->FlushTimeoutBatch();
29212923
}
29222924
{
29232925
// some failed
29242926
const_cast<FlusherMock*>(static_cast<const FlusherMock*>(pipeline.mFlushers[0]->GetPlugin()))->mIsValid = false;
29252927
TimeoutFlushManager::GetInstance()->UpdateRecord(configName, 0, 1, 3, nullptr);
29262928
TimeoutFlushManager::GetInstance()->UpdateRecord(configName, 1, 1, 3, nullptr);
29272929
APSARA_TEST_FALSE(pipeline.FlushBatch());
2928-
APSARA_TEST_TRUE(TimeoutFlushManager::GetInstance()->mTimeoutRecords.empty());
2930+
APSARA_TEST_EQUAL(0U, TimeoutFlushManager::GetInstance()->mTimeoutRecords.size());
2931+
APSARA_TEST_EQUAL(2U, TimeoutFlushManager::GetInstance()->mDeletedFlushers.size());
2932+
TimeoutFlushManager::GetInstance()->FlushTimeoutBatch();
29292933
}
29302934
}
29312935

0 commit comments

Comments
 (0)