Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions core/file_server/MultilineOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,48 @@ bool MultilineOptions::Init(const Json::Value& config, const CollectionPipelineC
} else if (mode == "whole_file") {
mMode = Mode::WHOLE_FILE;
mIsMultiline = true;

// FileWriteMode (only meaningful for whole_file mode)
string fileWriteMode;
if (!GetOptionalStringParam(config, "FileWriteMode", fileWriteMode, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(),
ctx.GetAlarm(),
errorMsg,
"append",
pluginType,
ctx.GetConfigName(),
ctx.GetProjectName(),
ctx.GetLogstoreName(),
ctx.GetRegion());
} else if (fileWriteMode == "overwrite") {
mFileWriteMode = FileWriteMode::OVERWRITE;
} else if (!fileWriteMode.empty() && fileWriteMode != "append") {
PARAM_WARNING_DEFAULT(ctx.GetLogger(),
ctx.GetAlarm(),
"string param FileWriteMode is not valid",
"append",
pluginType,
ctx.GetConfigName(),
ctx.GetProjectName(),
ctx.GetLogstoreName(),
ctx.GetRegion());
}

// MaxWholeFileBytes
uint32_t maxWholeFileBytes = 0;
if (!GetOptionalUIntParam(config, "MaxWholeFileBytes", maxWholeFileBytes, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(),
ctx.GetAlarm(),
errorMsg,
mMaxWholeFileBytes,
pluginType,
ctx.GetConfigName(),
ctx.GetProjectName(),
ctx.GetLogstoreName(),
ctx.GetRegion());
} else if (maxWholeFileBytes > 0) {
mMaxWholeFileBytes = maxWholeFileBytes;
}
} else if (!mode.empty() && mode != "custom") {
PARAM_WARNING_DEFAULT(ctx.GetLogger(),
ctx.GetAlarm(),
Expand Down
5 changes: 5 additions & 0 deletions core/file_server/MultilineOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <cstdint>

#include <string>
#include <utility>

Expand All @@ -30,6 +32,7 @@ class MultilineOptions {
public:
enum class Mode { CUSTOM, JSON, WHOLE_FILE };
enum class UnmatchedContentTreatment { DISCARD, SINGLE_LINE };
enum class FileWriteMode { APPEND, OVERWRITE }; // NOLINT(readability-identifier-naming)

bool Init(const Json::Value& config, const CollectionPipelineContext& ctx, const std::string& pluginType);
const std::shared_ptr<boost::regex>& GetStartPatternReg() const { return mStartPatternRegPtr; }
Expand All @@ -38,6 +41,8 @@ class MultilineOptions {
bool IsMultiline() const { return mIsMultiline; }

Mode mMode = Mode::CUSTOM;
FileWriteMode mFileWriteMode = FileWriteMode::APPEND;
uint32_t mMaxWholeFileBytes = 10 * 1024 * 1024;
std::string mStartPattern;
std::string mContinuePattern;
std::string mEndPattern;
Expand Down
11 changes: 7 additions & 4 deletions core/file_server/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1197,10 +1197,13 @@ bool ModifyHandler::RemoveReaderFromArrayAndMap(LogFileReaderPtr expectedReader,
}

void ModifyHandler::ForceReadLogAndPush(LogFileReaderPtr reader) {
auto logBuffer = make_unique<LogBuffer>();
auto pEvent = reader->CreateFlushTimeoutEvent();
reader->ReadLog(*logBuffer, pEvent.get());
PushLogToProcessor(reader, logBuffer.get(), true);
bool moreData;
do {
auto logBuffer = make_unique<LogBuffer>();
auto pEvent = reader->CreateFlushTimeoutEvent();
moreData = reader->ReadLog(*logBuffer, pEvent.get());
PushLogToProcessor(reader, logBuffer.get(), true);
} while (moreData);
}

int32_t ModifyHandler::PushLogToProcessor(LogFileReaderPtr reader, LogBuffer* logBuffer, bool dropIfBlocked) {
Expand Down
1 change: 0 additions & 1 deletion core/file_server/reader/FileReaderOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ namespace logtail {
struct FileReaderOptions {
enum class Encoding { UTF8, UTF16, GBK };
enum class InputType { Unknown, InputFile, InputContainerStdio };

InputType mInputType = InputType::Unknown;
Encoding mFileEncoding = Encoding::UTF8;
bool mTailingAllMatchedFiles = false;
Expand Down
178 changes: 174 additions & 4 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -986,8 +986,9 @@ bool LogFileReader::ReadLog(LogBuffer& logBuffer, const Event* event, bool isSta
bool tryRollback = true;
if (event != nullptr && event->IsReaderFlushTimeout()) {
// If flush timeout event, we should filter whether the event is legacy.
if (event->GetLastReadPos() == GetLastReadPos() && event->GetLastFilePos() == mLastFilePos
&& event->GetInode() == mDevInode.inode) {
if (mDrainingWholeFileCache
|| (event->GetLastReadPos() == GetLastReadPos() && event->GetLastFilePos() == mLastFilePos
&& event->GetInode() == mDevInode.inode)) {
tryRollback = false;
} else {
return false;
Expand Down Expand Up @@ -1343,6 +1344,64 @@ bool LogFileReader::CheckFileSignatureAndOffset(bool isOpenOnUpdate) {
mLogFileOp.Stat(ps);
time_t lastMTime = mLastMTime;
mLastMTime = ps.GetMtime();

// In WHOLE_FILE overwrite mode, the entire file is treated as a single log entry. A rewrite must be
// re-read from the beginning. Detection combines three signals (OR), so a same-second overwrite that
// a second-precision mtime alone would miss is still caught:
// 1. mtime change at nanosecond OR second precision;
// 2. file size change;
// 3. first-1KB signature change (covers equal-size, same-timestamp rewrites).
// The 1KB signature read is only performed when the cheap mtime/size signals show no change.
if (mMultilineConfig.first->mMode == MultilineOptions::Mode::WHOLE_FILE
&& mMultilineConfig.first->mFileWriteMode == MultilineOptions::FileWriteMode::OVERWRITE) {
int64_t mtimeSec = 0, mtimeNsec = 0;
ps.GetLastWriteTime(mtimeSec, mtimeNsec);
int64_t prevMTimeNsec = mLastMTimeNs;
int64_t prevWholeFileSize = mLastWholeFileSize;
mLastMTimeNs = mtimeNsec;
mLastWholeFileSize = endSize;

// Skip detection on the very first observation (no baseline yet).
if (lastMTime != 0) {
bool mtimeChanged = (lastMTime != mLastMTime) || (prevMTimeNsec != mtimeNsec);
bool sizeChanged = (prevWholeFileSize != endSize);
bool headChanged = false;
char firstLine[1025];
int nbytes = -1;
if (!mtimeChanged && !sizeChanged) {
nbytes = mLogFileOp.Pread(firstLine, 1, 1024, 0);
if (nbytes > 0) {
firstLine[nbytes] = '\0';
uint64_t tmpHash = mLastFileSignatureHash;
uint32_t tmpSize = mLastFileSignatureSize;
headChanged = !CheckAndUpdateSignature(string(firstLine), tmpHash, tmpSize);
}
}
if (mtimeChanged || sizeChanged || headChanged) {
LOG_INFO(sLogger,
("whole_file overwrite mode detected file modification, read from begin",
mHostLogPath)("mtime changed", mtimeChanged)("size changed", sizeChanged)(
"head changed", headChanged)("old mtime", lastMTime)("new mtime", mLastMTime)(
"project", GetProject())("logstore", GetLogstore())("config", GetConfigName()));
mLastFilePos = 0;
mCache.clear();
mDrainingWholeFileCache = false;
mWholeFileChunkIndex = 0;
if (nbytes < 0) {
nbytes = mLogFileOp.Pread(firstLine, 1, 1024, 0);
}
if (nbytes > 0) {
firstLine[nbytes] = '\0';
CheckAndUpdateSignature(string(firstLine), mLastFileSignatureHash, mLastFileSignatureSize);
}
if (mEOOption) {
updatePrimaryCheckpointSignature();
}
return true;
}
}
}

if (!isOpenOnUpdate || mLastFileSignatureSize == 0 || endSize < mLastFilePos
|| (endSize == mLastFilePos && lastMTime != mLastMTime)) {
char firstLine[1025];
Expand Down Expand Up @@ -1551,6 +1610,21 @@ size_t LogFileReader::getNextReadSize(int64_t fileEnd, bool& fromCpt) {
readSize = checkpoint.read_length();
LOG_INFO(sLogger, ("read specified length", readSize)("offset", mLastFilePos));
}
if (mMultilineConfig.first->mMode == MultilineOptions::Mode::WHOLE_FILE
&& mMultilineConfig.first->mFileWriteMode == MultilineOptions::FileWriteMode::OVERWRITE) {
allowMoreBufferSize = true;
if (readSize > mMultilineConfig.first->mMaxWholeFileBytes) {
int32_t curTime = time(nullptr);
if (curTime - mWholeFileOversizeWarnTime >= INT32_FLAG(logtail_alarm_interval)) {
mWholeFileOversizeWarnTime = curTime;
LOG_WARNING(sLogger,
("whole_file overwrite mode file exceeds max limit, skipping", mHostLogPath)(
"file size", fileEnd)("max allowed", mMultilineConfig.first->mMaxWholeFileBytes)(
"project", GetProject())("logstore", GetLogstore())("config", GetConfigName()));
}
return 0;
}
}
if (readSize > BUFFER_SIZE && !allowMoreBufferSize) {
readSize = BUFFER_SIZE;
}
Expand All @@ -1573,8 +1647,15 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
bool logTooLongSplitFlag = false;

logBuffer.readOffset = mLastFilePos;
// WHOLE_FILE overwrite: chunked drain of large cache (continuation or initiation)
if (mDrainingWholeFileCache
|| (!tryRollback && mMultilineConfig.first->mMode == MultilineOptions::Mode::WHOLE_FILE
&& mMultilineConfig.first->mFileWriteMode == MultilineOptions::FileWriteMode::OVERWRITE
&& mCache.size() > BUFFER_SIZE)) {
DrainWholeFileChunk(logBuffer, moreData);
return;
}
if (!mLogFileOp.IsOpen()) {
// read flush timeout
nbytes = mCache.size();
StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(nbytes);
stringBuffer = stringMemory.data;
Expand Down Expand Up @@ -1731,8 +1812,15 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
bool allowRollback = true;

logBuffer.readOffset = mLastFilePos;
// WHOLE_FILE overwrite: chunked drain of large cache (continuation or initiation)
if (mDrainingWholeFileCache
|| (!tryRollback && mMultilineConfig.first->mMode == MultilineOptions::Mode::WHOLE_FILE
&& mMultilineConfig.first->mFileWriteMode == MultilineOptions::FileWriteMode::OVERWRITE
&& mCache.size() > BUFFER_SIZE)) {
DrainWholeFileChunk(logBuffer, moreData);
return;
}
if (!mLogFileOp.IsOpen()) {
// read flush timeout
readCharCount = mCache.size();
gbkMemory.reset(new char[readCharCount + 1]);
gbkBuffer = gbkMemory.get();
Expand Down Expand Up @@ -2152,6 +2240,82 @@ size_t LogFileReader::AlignLastCharacter(char* buffer, size_t size) {
return size;
}

size_t LogFileReader::GetWholeFileChunkSize(char* data, size_t available) {
if (available <= BUFFER_SIZE) {
return available; // last chunk takes everything
}
// Prefer to end on a line boundary: the last '\n' within the window (newline included in the chunk).
for (size_t i = BUFFER_SIZE; i > 0; --i) {
if (data[i - 1] == '\n') {
return i;
}
}
// No newline in the window: avoid splitting a multibyte character (encoding-aware).
size_t aligned = AlignLastCharacter(data, BUFFER_SIZE);
return aligned == 0 ? BUFFER_SIZE : aligned; // guarantee progress
}

int32_t LogFileReader::CountWholeFileChunks() {
int32_t count = 0;
size_t pos = 0;
size_t total = mCache.size();
char* data = total ? &mCache[0] : nullptr;
while (pos < total) {
pos += GetWholeFileChunkSize(data + pos, total - pos);
++count;
}
return count;
}

void LogFileReader::DrainWholeFileChunk(LogBuffer& logBuffer, bool& moreData) {
if (!mDrainingWholeFileCache) {
mDrainingWholeFileCache = true;
mWholeFileChunkIndex = 0;
mWholeFileTotalChunks = CountWholeFileChunks();
mWholeFileId = mHostLogPath + "_" + ToString(mLastMTime) + "_" + ToString(mDevInode.inode);
mLastForceRead = true;
}
if (mCache.empty()) {
mDrainingWholeFileCache = false;
moreData = false;
return;
}
char* data = &mCache[0];
size_t chunkSize = GetWholeFileChunkSize(data, mCache.size());
if (mReaderConfig.first->mFileEncoding == FileReaderOptions::Encoding::GBK) {
// mCache holds raw GBK bytes; convert this chunk to UTF8 so it is readable downstream.
vector<long> lineFeedPos = {-1};
for (long idx = 0; idx < static_cast<long>(chunkSize) - 1; ++idx) {
if (data[idx] == '\n') {
lineFeedPos.push_back(idx);
}
}
lineFeedPos.push_back(static_cast<long>(chunkSize) - 1);
size_t srcLength = chunkSize;
size_t requiredLen
= EncodingConverter::GetInstance()->ConvertGbk2Utf8(data, &srcLength, nullptr, 0, lineFeedPos);
StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(requiredLen + 1);
size_t resultCharCount = EncodingConverter::GetInstance()->ConvertGbk2Utf8(
data, &srcLength, stringMemory.data, stringMemory.capacity, lineFeedPos);
logBuffer.rawBuffer = StringView(stringMemory.data, resultCharCount);
} else {
StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(chunkSize);
memcpy(stringMemory.data, data, chunkSize);
logBuffer.rawBuffer = StringView(stringMemory.data, chunkSize);
}
logBuffer.readLength = chunkSize;
logBuffer.readOffset = mLastFilePos;
logBuffer.wholeFileId = mWholeFileId;
logBuffer.wholeFileSeq = mWholeFileChunkIndex++;
logBuffer.wholeFileTotal = mWholeFileTotalChunks;
mCache.erase(0, chunkSize);
mLastFilePos += chunkSize;
moreData = !mCache.empty();
if (!moreData) {
mDrainingWholeFileCache = false;
}
}

std::unique_ptr<Event> LogFileReader::CreateFlushTimeoutEvent() {
auto result = std::make_unique<Event>(mHostLogPathDir,
mHostLogPathFile,
Expand Down Expand Up @@ -2544,6 +2708,12 @@ PipelineEventGroup LogFileReader::GenerateEventGroup(LogFileReaderPtr reader, Lo
event->SetContentNoCopy(DEFAULT_CONTENT_KEY, logBuffer->rawBuffer);
event->SetPosition(logBuffer->readOffset, logBuffer->readLength);

if (logBuffer->wholeFileSeq >= 0) {
event->SetContent("__whole_file_id__", logBuffer->wholeFileId);
event->SetContent("__whole_file_seq__", ToString(logBuffer->wholeFileSeq));
event->SetContent("__whole_file_total__", ToString(logBuffer->wholeFileTotal));
}

return group;
}

Expand Down
23 changes: 23 additions & 0 deletions core/file_server/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,15 @@ class LogFileReader {
ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true, bool isStaticFile = false);
void ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true, bool isStaticFile = false);

// WHOLE_FILE overwrite chunked drain helpers. Each emitted chunk is sized to end on a line boundary
// ('\n') when possible, otherwise on a character boundary, so that every chunk shown downstream
// (which does not reassemble) is human-readable.
void DrainWholeFileChunk(LogBuffer& logBuffer, bool& moreData);
// Byte length of the next front chunk of [data, data+available), capped at BUFFER_SIZE, preferring a
// trailing '\n', else a complete character; always >= 1 to guarantee progress.
size_t GetWholeFileChunkSize(char* data, size_t available);
int32_t CountWholeFileChunks();

size_t
ReadFile(LogFileOperator& logFileOp, void* buf, size_t size, int64_t& offset, TruncateInfo** truncateInfo = NULL);
static int32_t ParseTime(const char* buffer, const std::string& timeFormat);
Expand Down Expand Up @@ -505,6 +514,13 @@ class LogFileReader {
int64_t mExpectedFileSize = 0; // expected file size limit, used for StaticFileServer reader
time_t mLastMTime = 0;
std::string mCache;
bool mDrainingWholeFileCache = false;
int32_t mWholeFileChunkIndex = 0;
int32_t mWholeFileTotalChunks = 0;
std::string mWholeFileId;
// WHOLE_FILE overwrite detection baselines (only touched by whole_file overwrite logic).
int64_t mLastMTimeNs = 0;
int64_t mLastWholeFileSize = -1;
// >= 0: index of reader array, -1: new reader, -2: not in reader array, -3: not found
int32_t mIdxInReaderArrayFromLastCpt = CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;
// std::string mProjectName;
Expand All @@ -526,6 +542,9 @@ class LogFileReader {
std::string mContainerID;
time_t mContainerStoppedTime = 0;
time_t mReadStoppedContainerAlarmTime = 0;
// Rate-limits the "whole_file overwrite exceeds MaxWholeFileBytes" warning so an oversized file does
// not flood the log on every read cycle.
time_t mWholeFileOversizeWarnTime = 0;
int32_t mReadDelayTime = 0;
bool mSkipFirstModify = false;
// int64_t mReadDelayAlarmBytes;
Expand Down Expand Up @@ -747,6 +766,10 @@ struct LogBuffer {
uint64_t readOffset = 0;
uint64_t readLength = 0;
std::unique_ptr<SourceBuffer> sourcebuffer;
// WHOLE_FILE overwrite mode: chunked delivery metadata
std::string wholeFileId;
int32_t wholeFileSeq = -1;
int32_t wholeFileTotal = -1;

LogBuffer() : sourcebuffer(new SourceBuffer()) {}
void SetDependecy(const LogFileReaderPtr& reader) { logFileReader = reader; }
Expand Down
Loading
Loading