Skip to content

Commit dcb29ea

Browse files
authored
fix: file signature size 0 cause duplication (alibaba#2430)
* fix: file signature size 0 cause duplication Change-Id: I2d5235465d08204362700088763a75d918a7d60b Co-developed-by: Cursor <noreply@cursor.com> * fix Change-Id: Id19ba2c9db94d563f9fd92ba5c9674b507d668cb Co-developed-by: Cursor <noreply@cursor.com> * fix Change-Id: I13f2632531c71d55d78a09cd87b76672b43de8ac Co-developed-by: Cursor <noreply@cursor.com> * fix Change-Id: I2725919d2d5a90adc7eecd57e8cd1af099227dbc Co-developed-by: Cursor <noreply@cursor.com> * fix Change-Id: I016a5c66cc4a62c39d2db2554583ad249c28cfaa Co-developed-by: Cursor <noreply@cursor.com>
1 parent 55d6b59 commit dcb29ea

9 files changed

Lines changed: 260 additions & 11 deletions

File tree

core/file_server/EventDispatcher.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,7 @@ void EventDispatcher::AddExistedCheckPointFileEvents() {
650650
auto& cptPair = exactlyOnceCpts[idx];
651651
auto& cpt = cptPair.second;
652652
auto v1Cpt = make_shared<CheckPoint>(cpt.log_path(),
653+
cpt.log_path(),
653654
0,
654655
cpt.sig_size(),
655656
cpt.sig_hash(),

core/file_server/checkpoint/CheckPointManager.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,18 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
208208
uint32_t sigSize = 0;
209209
uint64_t sigHash = 0;
210210
string filePath = meta["file_name"].asString();
211+
string resolvedFilePath;
211212
string realFilePath;
212213
int32_t fileOpenFlag = 0; // default, we close file ptr
213214
int32_t containerStopped = 0;
214215
string containerID;
215216
int32_t lastForceRead = 0;
216217
int32_t idxInReaderArray = LogFileReader::CHECKPOINT_IDX_UNDEFINED;
218+
if (meta.isMember("resolved_file_name")) {
219+
resolvedFilePath = meta["resolved_file_name"].asString();
220+
} else {
221+
resolvedFilePath = filePath; // for backward compatibility
222+
}
217223
if (meta.isMember("real_file_name")) {
218224
realFilePath = meta["real_file_name"].asString();
219225
}
@@ -268,6 +274,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
268274
// No need to check if the config still matches the file here.
269275
configName = meta["config_name"].asString();
270276
CheckPoint* ptr = new CheckPoint(filePath,
277+
resolvedFilePath,
271278
offset,
272279
sigSize,
273280
sigHash,
@@ -302,6 +309,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
302309
}
303310
for (size_t i = 0; i < allConfig.size(); ++i) {
304311
CheckPoint* ptr = new CheckPoint(filePath,
312+
resolvedFilePath,
305313
offset,
306314
sigSize,
307315
sigHash,
@@ -347,6 +355,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
347355
CheckPoint* checkPointPtr = it->second.get();
348356
Json::Value leaf;
349357
leaf["file_name"] = Json::Value(checkPointPtr->mFileName);
358+
leaf["resolved_file_name"] = Json::Value(checkPointPtr->mResolvedFileName);
350359
leaf["real_file_name"] = Json::Value(checkPointPtr->mRealFileName);
351360
leaf["offset"] = Json::Value(ToString(checkPointPtr->mOffset));
352361
leaf["sig_size"] = Json::Value(Json::UInt(checkPointPtr->mSignatureSize));

core/file_server/checkpoint/CheckPointManager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@ class CheckPoint {
5151
std::string mCache;
5252
std::string mConfigName;
5353
std::string mFileName;
54+
std::string mResolvedFileName;
5455
std::string mRealFileName;
5556
int32_t mIdxInReaderArray = LogFileReader::CHECKPOINT_IDX_UNDEFINED;
5657

5758
CheckPoint() {}
5859

5960
CheckPoint(const std::string& filename,
61+
const std::string& resolvedFileName,
6062
int64_t offset,
6163
uint32_t signatureSize,
6264
uint64_t signatureHash,
@@ -78,6 +80,7 @@ class CheckPoint {
7880
mLastForceRead(lastForceRead),
7981
mConfigName(configName),
8082
mFileName(filename),
83+
mResolvedFileName(resolvedFileName),
8184
mRealFileName(realFileName) {}
8285
};
8386

core/file_server/reader/LogFileReader.cpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray
266266
ToString(mLogFileOp.IsOpen())));
267267
}
268268
CheckPoint* checkPointPtr = new CheckPoint(mHostLogPath,
269+
mResolvedHostLogPath,
269270
mLastFilePos,
270271
mLastFileSignatureSize,
271272
mLastFileSignatureHash,
@@ -321,6 +322,7 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t
321322
mLastFileSignatureHash = checkPointPtr->mSignatureHash;
322323
mLastFileSignatureSize = checkPointPtr->mSignatureSize;
323324
mRealLogPath = checkPointPtr->mRealFileName;
325+
mResolvedHostLogPath = checkPointPtr->mResolvedFileName;
324326
mLastEventTime = checkPointPtr->mLastUpdateTime;
325327
if (checkPointPtr->mContainerID == mContainerID) {
326328
mContainerStopped = checkPointPtr->mContainerStopped;
@@ -1112,6 +1114,7 @@ bool LogFileReader::UpdateFilePtr() {
11121114
OnOpenFileError();
11131115
} else if (CheckDevInode()) {
11141116
GloablFileDescriptorManager::GetInstance()->OnFileOpen(this);
1117+
ResolveHostLogPath();
11151118
LOG_INFO(sLogger,
11161119
("open file succeeded, project", GetProject())("logstore", GetLogstore())(
11171120
"config", GetConfigName())("log reader queue name", mHostLogPath)(
@@ -1151,6 +1154,7 @@ bool LogFileReader::UpdateFilePtr() {
11511154
// the mHostLogPath's dev inode equal to mDevInode, so real log path is mHostLogPath
11521155
mRealLogPath = mHostLogPath;
11531156
GloablFileDescriptorManager::GetInstance()->OnFileOpen(this);
1157+
ResolveHostLogPath();
11541158
LOG_INFO(
11551159
sLogger,
11561160
("open file succeeded, project", GetProject())("logstore", GetLogstore())("config", GetConfigName())(
@@ -1308,7 +1312,7 @@ bool LogFileReader::CheckFileSignatureAndOffset(bool isOpenOnUpdate) {
13081312

13091313
// If file size is 0 and filename is changed, we cannot judge if the inode is reused by signature,
13101314
// so we just recreate the reader to avoid filename mismatch
1311-
if (mLastFileSignatureSize == 0 && mRealLogPath != mHostLogPath) {
1315+
if (mLastFileSignatureSize == 0 && mRealLogPath != mResolvedHostLogPath) {
13121316
return false;
13131317
}
13141318
fsutil::PathStat ps;
@@ -2497,6 +2501,27 @@ bool LogFileReader::UpdateContainerInfo() {
24972501
return false;
24982502
}
24992503

2504+
void LogFileReader::ResolveHostLogPath() {
2505+
if (!mResolvedHostLogPath.empty()) {
2506+
return;
2507+
}
2508+
if (mSymbolicLinkFlag) {
2509+
mResolvedHostLogPath = mHostLogPath;
2510+
return;
2511+
}
2512+
if (mLogFileOp.IsOpen()) {
2513+
mResolvedHostLogPath = mLogFileOp.GetFilePath();
2514+
} else {
2515+
mResolvedHostLogPath = mHostLogPath;
2516+
}
2517+
if (mResolvedHostLogPath != mHostLogPath) {
2518+
LOG_INFO(sLogger,
2519+
("open file", "symbolic link exists in absolute path")("host log path", mHostLogPath)(
2520+
"resolved host log path",
2521+
mResolvedHostLogPath)("dev", ToString(mDevInode.dev))("inode", ToString(mDevInode.inode)));
2522+
}
2523+
}
2524+
25002525
#ifdef APSARA_UNIT_TEST_MAIN
25012526
void LogFileReader::UpdateReaderManual() {
25022527
if (mLogFileOp.IsOpen()) {
@@ -2505,6 +2530,7 @@ void LogFileReader::UpdateReaderManual() {
25052530
mLogFileOp.Open(mHostLogPath.c_str());
25062531
mDevInode = GetFileDevInode(mHostLogPath);
25072532
mRealLogPath = mHostLogPath;
2533+
ResolveHostLogPath();
25082534
}
25092535
#endif
25102536

core/file_server/reader/LogFileReader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,13 +481,15 @@ class LogFileReader {
481481
inline int64_t GetLastReadPos() const { // pos read but may not consumed, used for read needed
482482
return mLastFilePos + mCache.size();
483483
}
484+
void ResolveHostLogPath();
484485

485486
// std::string mRegion;
486487
// std::string mCategory;
487488
// std::string mConfigName;
488489
std::string mHostLogPath;
489490
std::string mHostLogPathDir;
490491
std::string mHostLogPathFile;
492+
std::string mResolvedHostLogPath; // same as mHostLogPath, but with resolved symbolic link
491493
std::string mRealLogPath; // real log path
492494
std::string mChineseEncodingPath; // On Windows, Chinese config base path's __path__ will be converted to GBK
493495
bool mSymbolicLinkFlag = false;
@@ -722,6 +724,7 @@ class LogFileReader {
722724
friend class FileTagUnittest;
723725
friend class CreateModifyHandlerUnittest;
724726
friend class LogFileReaderHoleUnittest;
727+
friend class LogFileReaderResolvedPathUnittest;
725728

726729
protected:
727730
void UpdateReaderManual();

core/unittest/config/ConfigUpdatorUnittest.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ void ConfigUpdatorUnittest::TestCheckPointManager() {
615615
// case 2 : add filecheckpoint
616616
for (int i = 0; i < 4; ++i) {
617617
CheckPoint* checkPointPtr = new CheckPoint(
618-
filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]);
618+
filenames[i], filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]);
619619
checkpointManager->AddCheckPoint(checkPointPtr);
620620
}
621621
// timeout checkpoint
@@ -1842,7 +1842,7 @@ void ConfigUpdatorUnittest::TestCheckPointSaveInterval() {
18421842

18431843
for (int i = 0; i < 3; ++i) {
18441844
CheckPoint* checkPointPtr = new CheckPoint(
1845-
filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]);
1845+
filenames[i], filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]);
18461846
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr);
18471847
}
18481848

@@ -1929,7 +1929,7 @@ void ConfigUpdatorUnittest::TestCheckPointUserDefinedFilePath() {
19291929
}
19301930
for (int i = 0; i < 3; ++i) {
19311931
CheckPoint* checkPointPtr = new CheckPoint(
1932-
filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]);
1932+
filenames[i], filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]);
19331933
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr);
19341934
}
19351935

@@ -2015,7 +2015,7 @@ void ConfigUpdatorUnittest::TestCheckPointLoadDefaultFile() {
20152015
}
20162016
for (int i = 0; i < 3; ++i) {
20172017
CheckPoint* checkPointPtr = new CheckPoint(
2018-
filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]);
2018+
filenames[i], filenames[i], offsets[i], sigSizes[i], sigHashs[i], DevInode(devs[i], inodes[i]), configs[i]);
20192019
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr);
20202020
}
20212021
bfs::remove(STRING_FLAG(check_point_filename));

core/unittest/event_handler/ModifyHandlerUnittest.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -629,26 +629,26 @@ void ModifyHandlerUnittest::TestRecoverReaderFromCheckpointContainer() {
629629
auto devInode2 = GetFileDevInode(logPath2);
630630

631631
addContainerInfo("1");
632-
CheckPoint* checkPointPtr
633-
= new CheckPoint(logPath, 13, sigSize, sigHash, devInode, mConfigName, logPath, false, true, "1", false);
632+
CheckPoint* checkPointPtr = new CheckPoint(
633+
logPath, logPath, 13, sigSize, sigHash, devInode, mConfigName, logPath, false, true, "1", false);
634634
// use last event time as checkpoint's last update time
635635
checkPointPtr->mLastUpdateTime = time(NULL);
636636
checkPointPtr->mCache = "";
637637
checkPointPtr->mIdxInReaderArray = 0;
638638
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr);
639639

640640
// not set container stopped for rotator reader
641-
CheckPoint* checkPointPtr1
642-
= new CheckPoint(logPath, 13, sigSize, sigHash, devInode1, mConfigName, logPath1, false, false, "1", false);
641+
CheckPoint* checkPointPtr1 = new CheckPoint(
642+
logPath, logPath, 13, sigSize, sigHash, devInode1, mConfigName, logPath1, false, false, "1", false);
643643
checkPointPtr1->mLastUpdateTime = time(NULL);
644644
checkPointPtr1->mCache = "";
645645
checkPointPtr1->mIdxInReaderArray = -2;
646646
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr1);
647647

648648

649649
// set container stopped for rotator reader
650-
CheckPoint* checkPointPtr2
651-
= new CheckPoint(logPath, 13, sigSize, sigHash, devInode2, mConfigName, logPath2, false, true, "1", false);
650+
CheckPoint* checkPointPtr2 = new CheckPoint(
651+
logPath, logPath, 13, sigSize, sigHash, devInode2, mConfigName, logPath2, false, true, "1", false);
652652
checkPointPtr2->mLastUpdateTime = time(NULL);
653653
checkPointPtr2->mCache = "";
654654
checkPointPtr2->mIdxInReaderArray = -2;

core/unittest/reader/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ target_link_libraries(force_read_unittest ${UT_BASE_TARGET})
4242
add_executable(file_tag_unittest FileTagUnittest.cpp)
4343
target_link_libraries(file_tag_unittest ${UT_BASE_TARGET})
4444

45+
add_executable(log_file_reader_resolved_path_unittest LogFileReaderResolvedPathUnittest.cpp)
46+
target_link_libraries(log_file_reader_resolved_path_unittest ${UT_BASE_TARGET})
47+
4548
if (UNIX)
4649
file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/testDataSet)
4750
file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/testDataSet/ DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/testDataSet/)
@@ -60,3 +63,4 @@ gtest_discover_tests(source_buffer_unittest)
6063
gtest_discover_tests(get_last_line_data_unittest)
6164
gtest_discover_tests(force_read_unittest)
6265
gtest_discover_tests(file_tag_unittest)
66+
gtest_discover_tests(log_file_reader_resolved_path_unittest)

0 commit comments

Comments
 (0)