Skip to content

Commit 1553106

Browse files
committed
support redis protocol
Signed-off-by: Shuyi Cheng <chengshuyi@linux.alibaba.com>
1 parent b0e356a commit 1553106

File tree

7 files changed

+662
-2
lines changed

7 files changed

+662
-2
lines changed

core/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ if (LINUX)
158158
if (ENABLE_ENTERPRISE)
159159
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} shennong shennong/sdk apm/forward)
160160
endif()
161-
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} ebpf ebpf/type ebpf/type/table ebpf/util ebpf/util/sampler ebpf/protocol/http ebpf/protocol/mysql ebpf/protocol ebpf/plugin/file_security ebpf/plugin/network_observer ebpf/plugin/process_security ebpf/plugin/network_security ebpf/plugin ebpf/observer ebpf/security
161+
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} ebpf ebpf/type ebpf/type/table ebpf/util ebpf/util/sampler ebpf/protocol/http ebpf/protocol/mysql ebpf/protocol/redis ebpf/protocol ebpf/plugin/file_security ebpf/plugin/network_observer ebpf/plugin/process_security ebpf/plugin/network_security ebpf/plugin ebpf/observer ebpf/security
162162
prometheus prometheus/labels prometheus/schedulers prometheus/async prometheus/component
163163
host_monitor host_monitor/collector host_monitor/common forward forward/loongsuite
164164
)

core/ebpf/plugin/network_observer/Connection.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,12 @@ static constexpr StringView kExternalStr = "external";
3535
static constexpr StringView kLocalhostStr = "localhost";
3636
static constexpr StringView kHttpStr = "http";
3737
static constexpr StringView kMysqlStr = "mysql";
38+
static constexpr StringView kRedisStr = "redis";
3839
static constexpr StringView kSqlStr = "sql";
40+
static constexpr StringView kNoSqlStr = "nosql";
3941
static constexpr StringView kRpc25Str = "25";
4042
static constexpr StringView kRpc60Str = "60";
43+
static constexpr StringView kRpc105Str = "105";
4144
static constexpr StringView kRpc0Str = "0";
4245
static constexpr StringView kHttpClientStr = "http_client";
4346
static constexpr StringView kUnknownStr = "unknown";
@@ -132,6 +135,11 @@ void Connection::TryAttachL7Meta(support_role_e role, support_proto_e protocol)
132135
mTags.SetNoCopy<kCallKind>(kSqlStr);
133136
mTags.SetNoCopy<kCallType>(kMysqlStr);
134137
MarkL7MetaAttached();
138+
} else if (mProtocol == support_proto_e::ProtoRedis) {
139+
mTags.SetNoCopy<kRpcType>(kRpc105Str);
140+
mTags.SetNoCopy<kCallKind>(kNoSqlStr);
141+
mTags.SetNoCopy<kCallType>(kRedisStr);
142+
MarkL7MetaAttached();
135143
}
136144
}
137145

core/ebpf/protocol/ProtocolParser.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ extern "C" {
3131
namespace logtail::ebpf {
3232

3333
std::set<support_proto_e> ProtocolParserManager::AvaliableProtocolTypes() const {
34-
return {support_proto_e::ProtoHTTP, support_proto_e::ProtoMySQL};
34+
return {support_proto_e::ProtoHTTP, support_proto_e::ProtoMySQL, support_proto_e::ProtoRedis};
3535
}
3636

3737
support_proto_e ProtocolStringToEnum(std::string protocol) {
@@ -40,6 +40,8 @@ support_proto_e ProtocolStringToEnum(std::string protocol) {
4040
return support_proto_e::ProtoHTTP;
4141
} else if (protocol == "MYSQL") {
4242
return support_proto_e::ProtoMySQL;
43+
} else if (protocol == "REDIS") {
44+
return support_proto_e::ProtoRedis;
4345
}
4446

4547
return support_proto_e::ProtoUnknown;
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
// Copyright 2025 iLogtail Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "RedisParser.h"
16+
17+
#include <algorithm>
18+
#include <cctype>
19+
#include <map>
20+
21+
#include "common/StringTools.h"
22+
#include "ebpf/type/NetworkObserverEvent.h"
23+
#include "ebpf/util/TraceId.h"
24+
#include "logger/Logger.h"
25+
26+
namespace logtail::ebpf {
27+
28+
std::vector<std::shared_ptr<L7Record>>
29+
REDISProtocolParser::Parse(struct conn_data_event_t* dataEvent,
30+
const std::shared_ptr<Connection>& conn,
31+
const std::shared_ptr<AppDetail>& appDetail,
32+
const std::shared_ptr<AppConvergerManager>& converger) {
33+
auto record = std::make_shared<RedisRecord>(conn, appDetail);
34+
record->SetEndTsNs(dataEvent->end_ts);
35+
record->SetStartTsNs(dataEvent->start_ts);
36+
auto spanId = GenerateSpanID();
37+
38+
// Mark as sample if it's a slow request (> 500ms) or selected by sampler
39+
if (record->GetLatencyMs() > 500 || appDetail->mSampler->ShouldSample(spanId)) {
40+
record->MarkSample();
41+
}
42+
43+
if (dataEvent->response_len > 0) {
44+
std::string_view buf(dataEvent->msg + dataEvent->request_len, dataEvent->response_len);
45+
ParseState state = redis::ParseResponse(buf, record, true, false);
46+
if (state != ParseState::kSuccess) {
47+
LOG_DEBUG(sLogger, ("[REDISProtocolParser]: Parse REDIS response failed", int(state)));
48+
return {};
49+
}
50+
}
51+
52+
if (dataEvent->request_len > 0) {
53+
std::string_view buf(dataEvent->msg, dataEvent->request_len);
54+
ParseState state = redis::ParseRequest(buf, record, false);
55+
if (state != ParseState::kSuccess) {
56+
LOG_DEBUG(sLogger, ("[REDISProtocolParser]: Parse REDIS request failed", int(state)));
57+
return {};
58+
}
59+
if (converger) {
60+
converger->DoConverge(appDetail, ConvType::kSql, record->mSql);
61+
}
62+
}
63+
64+
if (record->ShouldSample()) {
65+
record->SetSpanId(std::move(spanId));
66+
record->SetTraceId(GenerateTraceID());
67+
}
68+
69+
return {record};
70+
}
71+
72+
namespace redis {
73+
74+
// Redis RESP protocol constants
75+
constexpr size_t kMaxCommandLength = 256;
76+
constexpr char kRespArrayType = '*';
77+
constexpr char kRespBulkStringType = '$';
78+
constexpr char kRespSimpleStringType = '+';
79+
constexpr char kRespErrorType = '-';
80+
constexpr char kRespIntegerType = ':';
81+
constexpr char kCRLF[] = "\r\n";
82+
83+
// Helper function: Find CRLF (\r\n) in buffer
84+
static size_t FindCRLF(const std::string_view& buf, size_t start = 0) {
85+
for (size_t i = start; i + 1 < buf.size(); ++i) {
86+
if (buf[i] == '\r' && buf[i + 1] == '\n') {
87+
return i;
88+
}
89+
}
90+
return std::string_view::npos;
91+
}
92+
93+
// Helper function: Parse integer from string_view
94+
static bool ParseInteger(const std::string_view& str, int64_t& result) {
95+
if (str.empty()) {
96+
return false;
97+
}
98+
try {
99+
size_t pos;
100+
result = std::stoll(std::string(str), &pos);
101+
return pos == str.size();
102+
} catch (...) {
103+
return false;
104+
}
105+
}
106+
107+
// Parse RESP Bulk String: $<length>\r\n<data>\r\n
108+
static ParseState ParseBulkString(std::string_view& buf, std::string& result, size_t maxLen) {
109+
if (buf.empty() || buf[0] != kRespBulkStringType) {
110+
return ParseState::kInvalid;
111+
}
112+
113+
// Find CRLF after length
114+
size_t lengthEnd = FindCRLF(buf, 1);
115+
if (lengthEnd == std::string_view::npos) {
116+
return ParseState::kNeedsMoreData;
117+
}
118+
119+
// Parse length
120+
int64_t length;
121+
if (!ParseInteger(buf.substr(1, lengthEnd - 1), length)) {
122+
return ParseState::kInvalid;
123+
}
124+
125+
// Null bulk string: $-1\r\n
126+
if (length == -1) {
127+
buf.remove_prefix(lengthEnd + 2);
128+
result = "";
129+
return ParseState::kSuccess;
130+
}
131+
132+
if (length < 0) {
133+
return ParseState::kInvalid;
134+
}
135+
136+
// Check if we have enough data
137+
size_t requiredSize = lengthEnd + 2 + length + 2; // $len\r\ndata\r\n
138+
if (buf.size() < requiredSize) {
139+
return ParseState::kNeedsMoreData;
140+
}
141+
142+
// Verify trailing CRLF
143+
if (buf[requiredSize - 2] != '\r' || buf[requiredSize - 1] != '\n') {
144+
return ParseState::kInvalid;
145+
}
146+
147+
// Extract data (limit to maxLen)
148+
size_t dataStart = lengthEnd + 2;
149+
size_t dataLen = std::min(static_cast<size_t>(length), maxLen);
150+
result = std::string(buf.substr(dataStart, dataLen));
151+
152+
buf.remove_prefix(requiredSize);
153+
return ParseState::kSuccess;
154+
}
155+
156+
// Parse Redis RESP request
157+
ParseState ParseRequest(std::string_view& buf, std::shared_ptr<RedisRecord>& result, bool forceSample) {
158+
if (buf.empty()) {
159+
return ParseState::kNeedsMoreData;
160+
}
161+
162+
// Parse Redis RESP protocol request
163+
// Standard format: *<arg_count>\r\n$<arg1_len>\r\n<arg1>\r\n$<arg2_len>\r\n<arg2>\r\n...
164+
if (buf[0] == kRespArrayType) {
165+
// Array type request (standard Redis command format)
166+
size_t arrayCountEnd = FindCRLF(buf, 1);
167+
if (arrayCountEnd == std::string_view::npos) {
168+
return ParseState::kNeedsMoreData;
169+
}
170+
171+
// Parse argument count
172+
int64_t arrayCount;
173+
if (!ParseInteger(buf.substr(1, arrayCountEnd - 1), arrayCount)) {
174+
return ParseState::kInvalid;
175+
}
176+
177+
if (arrayCount <= 0) {
178+
return ParseState::kInvalid;
179+
}
180+
181+
// Remove array header
182+
buf.remove_prefix(arrayCountEnd + 2);
183+
184+
// Parse first argument (command name)
185+
std::string commandName;
186+
ParseState state = ParseBulkString(buf, commandName, kMaxCommandLength);
187+
if (state != ParseState::kSuccess) {
188+
return state;
189+
}
190+
191+
// Convert command name to uppercase
192+
std::transform(commandName.begin(), commandName.end(), commandName.begin(),
193+
[](unsigned char c) { return std::toupper(c); });
194+
result->SetCommandName(commandName);
195+
196+
// Build complete SQL string (command + arguments)
197+
std::string sql = commandName;
198+
for (int64_t i = 1; i < arrayCount && i < 10; ++i) { // Parse at most 10 arguments
199+
std::string arg;
200+
state = ParseBulkString(buf, arg, kMaxCommandLength);
201+
if (state != ParseState::kSuccess) {
202+
// Keep already parsed command even if subsequent arguments fail
203+
break;
204+
}
205+
sql += " " + arg;
206+
}
207+
208+
// Limit SQL length
209+
if (sql.size() > kMaxCommandLength) {
210+
sql = sql.substr(0, kMaxCommandLength);
211+
}
212+
result->SetSql(sql);
213+
214+
return ParseState::kSuccess;
215+
} else if (buf[0] == kRespBulkStringType || buf[0] == kRespSimpleStringType) {
216+
// Simple command or single bulk string (inline command, less common)
217+
size_t endPos = FindCRLF(buf);
218+
if (endPos == std::string_view::npos) {
219+
return ParseState::kNeedsMoreData;
220+
}
221+
std::string cmd = std::string(buf.substr(0, std::min(endPos, kMaxCommandLength)));
222+
result->SetSql(cmd);
223+
result->SetCommandName(cmd);
224+
return ParseState::kSuccess;
225+
} else {
226+
// Unknown format
227+
return ParseState::kInvalid;
228+
}
229+
}
230+
231+
ParseState ParseResponse(std::string_view& buf, std::shared_ptr<RedisRecord>& result, bool closed, bool forceSample) {
232+
if (buf.empty()) {
233+
return ParseState::kNeedsMoreData;
234+
}
235+
236+
// Parse Redis RESP protocol response
237+
char responseType = buf[0];
238+
239+
switch (responseType) {
240+
case kRespSimpleStringType: { // Simple string: +OK\r\n
241+
size_t endPos = FindCRLF(buf);
242+
if (endPos == std::string_view::npos) {
243+
return ParseState::kNeedsMoreData;
244+
}
245+
result->SetStatusCode(0); // Success
246+
buf.remove_prefix(endPos + 2);
247+
return ParseState::kSuccess;
248+
}
249+
case kRespErrorType: { // Error: -ERR message\r\n
250+
size_t endPos = FindCRLF(buf);
251+
if (endPos == std::string_view::npos) {
252+
return ParseState::kNeedsMoreData;
253+
}
254+
result->SetStatusCode(1); // Error
255+
if (endPos > 1) {
256+
std::string errorMsg = std::string(buf.substr(1, endPos - 1));
257+
result->SetErrorMessage(errorMsg);
258+
// Force sampling for error responses
259+
result->MarkSample();
260+
}
261+
buf.remove_prefix(endPos + 2);
262+
return ParseState::kSuccess;
263+
}
264+
case kRespIntegerType: { // Integer: :1000\r\n
265+
size_t endPos = FindCRLF(buf);
266+
if (endPos == std::string_view::npos) {
267+
return ParseState::kNeedsMoreData;
268+
}
269+
result->SetStatusCode(0); // Success
270+
buf.remove_prefix(endPos + 2);
271+
return ParseState::kSuccess;
272+
}
273+
case kRespBulkStringType: { // Bulk string: $len\r\ndata\r\n
274+
size_t lengthEnd = FindCRLF(buf, 1);
275+
if (lengthEnd == std::string_view::npos) {
276+
return ParseState::kNeedsMoreData;
277+
}
278+
279+
int64_t length;
280+
if (!ParseInteger(buf.substr(1, lengthEnd - 1), length)) {
281+
return ParseState::kInvalid;
282+
}
283+
284+
// Null bulk string: $-1\r\n (NULL)
285+
if (length == -1) {
286+
result->SetStatusCode(0);
287+
buf.remove_prefix(lengthEnd + 2);
288+
return ParseState::kSuccess;
289+
}
290+
291+
if (length < 0) {
292+
return ParseState::kInvalid;
293+
}
294+
295+
// Check if data is complete
296+
size_t requiredSize = lengthEnd + 2 + length + 2;
297+
if (buf.size() < requiredSize) {
298+
return ParseState::kNeedsMoreData;
299+
}
300+
301+
// Verify trailing CRLF
302+
if (buf[requiredSize - 2] != '\r' || buf[requiredSize - 1] != '\n') {
303+
return ParseState::kInvalid;
304+
}
305+
306+
result->SetStatusCode(0); // Success
307+
buf.remove_prefix(requiredSize);
308+
return ParseState::kSuccess;
309+
}
310+
case kRespArrayType: { // Array: *count\r\n...
311+
size_t arrayCountEnd = FindCRLF(buf, 1);
312+
if (arrayCountEnd == std::string_view::npos) {
313+
return ParseState::kNeedsMoreData;
314+
}
315+
316+
int64_t arrayCount;
317+
if (!ParseInteger(buf.substr(1, arrayCountEnd - 1), arrayCount)) {
318+
return ParseState::kInvalid;
319+
}
320+
321+
// Empty or null array: *-1\r\n or *0\r\n
322+
if (arrayCount <= 0) {
323+
result->SetStatusCode(0);
324+
buf.remove_prefix(arrayCountEnd + 2);
325+
return ParseState::kSuccess;
326+
}
327+
328+
// Simplified handling for array responses: only validate format, not parsing all elements
329+
// This avoids complex recursive parsing and improves performance
330+
result->SetStatusCode(0); // Success
331+
buf.remove_prefix(arrayCountEnd + 2);
332+
return ParseState::kSuccess;
333+
}
334+
default:
335+
// Unknown response type
336+
result->SetStatusCode(-1); // Unknown
337+
return ParseState::kInvalid;
338+
}
339+
}
340+
341+
} // namespace redis
342+
} // namespace logtail::ebpf

0 commit comments

Comments
 (0)