Skip to content

Commit cbe16e6

Browse files
committed
1. modify Mark logic
2. add KernelNanoTimeToUTC 3. modify signature for http parser 4. modify spanID/traceID in appRecord
1 parent be2f04a commit cbe16e6

13 files changed

+263
-169
lines changed

Diff for: core/common/TimeUtil.cpp

+13-1
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ long GetTicksPerSecond() {
411411
}
412412

413413
std::chrono::nanoseconds GetTimeDiffFromMonotonic() {
414+
LOG_INFO(sLogger, ("enter", "aaa"));
414415
#if defined(__linux__)
415416
struct timespec t;
416417
int ret = clock_gettime(CLOCK_MONOTONIC, &t);
@@ -422,9 +423,20 @@ std::chrono::nanoseconds GetTimeDiffFromMonotonic() {
422423
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
423424
auto boot_ns = t.tv_sec * 1000000000ULL + t.tv_nsec;
424425
return std::chrono::nanoseconds(now_ns - boot_ns);
425-
#elif defined(__APPLE__)
426+
#else
426427
return std::chrono::nanoseconds(0);
427428
#endif
428429
}
429430

431+
struct timespec KernelNanoTimeToUTC(uint64_t nano) {
432+
static std::chrono::nanoseconds diff = GetTimeDiffFromMonotonic();
433+
auto ts = std::chrono::nanoseconds(nano + diff.count());
434+
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(ts);
435+
auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(ts - seconds);
436+
struct timespec res;
437+
res.tv_sec = seconds.count();
438+
res.tv_nsec = nanoseconds.count();
439+
return res;
440+
}
441+
430442
} // namespace logtail

Diff for: core/common/TimeUtil.h

+2
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,6 @@ long GetTicksPerSecond();
101101

102102
std::chrono::nanoseconds GetTimeDiffFromMonotonic();
103103

104+
struct timespec KernelNanoTimeToUTC(uint64_t nano);
105+
104106
} // namespace logtail

Diff for: core/ebpf/driver/NetworkFilter.cpp

-3
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,6 @@ int CreateNetworkFilterForCallname(
338338
newConfig.index(),
339339
std::holds_alternative<SecurityFileFilter>(newConfig));
340340

341-
std::vector<AttachProgOps> attachOps = {AttachProgOps("kprobe_" + callName, true)};
342341
int ret = 0;
343342
int callNameIdx = GetCallNameIdx(callName);
344343
if (callNameIdx < 0) {
@@ -394,9 +393,7 @@ int CreateNetworkFilterForCallname(
394393
}
395394

396395
int DeleteNetworkFilterForCallname(std::shared_ptr<BPFWrapper<security_bpf>>& wrapper, const std::string& callName) {
397-
std::vector<AttachProgOps> attachOps = {AttachProgOps("kprobe_" + callName, true)};
398396
ebpf_log(eBPFLogType::NAMI_LOG_TYPE_INFO, "DisableCallName %s\n", callName.c_str());
399-
400397
int callNameIdx = GetCallNameIdx(callName);
401398
if (callNameIdx < 0) {
402399
return 1;

Diff for: core/ebpf/plugin/network_observer/Connection.cpp

+124-55
Original file line numberDiff line numberDiff line change
@@ -59,33 +59,33 @@ void Connection::UpdateConnState(struct conn_ctrl_event_t* event) {
5959
}
6060
}
6161

62-
void Connection::UpdateRole(enum support_role_e role) {
63-
if (!IsL7MetaAttachReady()) {
64-
// WriteLock lock(mProtocolAndRoleLock);
65-
if (mRole != IsUnknown && mRole != role) {
66-
LOG_WARNING(
67-
sLogger,
68-
("role change!! last role", magic_enum::enum_name(mRole))("new role", magic_enum::enum_name(role)));
69-
} else {
70-
mRole = role;
71-
TryUpdateProtocolAttr();
72-
}
73-
}
74-
}
75-
76-
void Connection::UpdateProtocol(support_proto_e protocol) {
77-
if (!IsL7MetaAttachReady()) {
78-
// WriteLock lock(mProtocolAndRoleLock);
79-
if (mProtocol != support_proto_e::ProtoUnknown && mProtocol != protocol) {
80-
LOG_WARNING(sLogger,
81-
("protocol change!! last protocol",
82-
magic_enum::enum_name(mProtocol))("new protocol", magic_enum::enum_name(protocol)));
83-
} else {
84-
mProtocol = protocol;
85-
TryUpdateProtocolAttr();
86-
}
87-
}
88-
}
62+
// void Connection::UpdateRole(enum support_role_e role) {
63+
// if (!IsL7MetaAttachReady()) {
64+
// // WriteLock lock(mProtocolAndRoleLock);
65+
// if (mRole != IsUnknown && mRole != role) {
66+
// LOG_WARNING(
67+
// sLogger,
68+
// ("role change!! last role", magic_enum::enum_name(mRole))("new role", magic_enum::enum_name(role)));
69+
// } else {
70+
// mRole = role;
71+
// TryAttachL7Meta();
72+
// }
73+
// }
74+
// }
75+
76+
// void Connection::UpdateProtocol(support_proto_e protocol) {
77+
// if (!IsL7MetaAttachReady()) {
78+
// // WriteLock lock(mProtocolAndRoleLock);
79+
// if (mProtocol != support_proto_e::ProtoUnknown && mProtocol != protocol) {
80+
// LOG_WARNING(sLogger,
81+
// ("protocol change!! last protocol",
82+
// magic_enum::enum_name(mProtocol))("new protocol", magic_enum::enum_name(protocol)));
83+
// } else {
84+
// mProtocol = protocol;
85+
// TryAttachL7Meta();
86+
// }
87+
// }
88+
// }
8989

9090
// only called by poller thread ...
9191
void Connection::UpdateConnStats(struct conn_stats_event_t* event) {
@@ -101,12 +101,26 @@ void Connection::UpdateConnStats(struct conn_stats_event_t* event) {
101101
}
102102

103103
this->mLastUpdateTs = eventTs;
104+
// if (IsL4MetaAttachReady()) {
105+
// LOG_DEBUG(sLogger, ("netMeta already attached", ""));
106+
// UpdateL4Meta(event);
107+
// MarkXXX();
108+
// TryAttachSelf();
109+
// }
110+
if (!IsL4MetaAttachReady()) {
111+
LOG_DEBUG(sLogger, ("netMeta already attached", ""));
112+
UpdateL4Meta(event);
113+
MarkL4MetaAttached();
114+
TryAttachPeerMeta(true, event->si.family, event->si.ap.daddr);
115+
TryAttachSelfMeta();
116+
}
104117

105-
UpdateRole(event->role);
118+
TryAttachL7Meta(event->role, event->protocol);
106119

107-
UpdateProtocol(event->protocol);
120+
// 合并成 L7 meta
121+
// UpdateRole(event->role);
108122

109-
UpdateNetMetaAttr(event);
123+
// UpdateProtocol(event->protocol);
110124

111125
mCurrStats.mSendBytes = event->wr_bytes;
112126
mCurrStats.mRecvBytes = event->rd_bytes;
@@ -117,6 +131,7 @@ void Connection::UpdateConnStats(struct conn_stats_event_t* event) {
117131
mLastStats.mRecvBytes = event->last_output_rd_bytes;
118132
mLastStats.mSendPackets = event->last_output_wr_pkts;
119133
mLastStats.mRecvPackets = event->last_output_rd_pkts;
134+
120135
LOG_DEBUG(sLogger,
121136
("stage", "updateConnStates")("mSendBytes", event->wr_bytes)("mRecvBytes", event->rd_bytes)(
122137
"mSendPackets", event->wr_pkts)("mRecvPackets", event->rd_pkts)("last", "")(
@@ -151,31 +166,86 @@ bool Connection::GenerateConnStatsRecord(const std::shared_ptr<AbstractRecord>&
151166
return true;
152167
}
153168

154-
void Connection::TryUpdateProtocolAttr() {
155-
if (mProtocol == support_proto_e::ProtoUnknown || IsL7MetaAttachReady()) {
169+
// void Connection::TryAttachL7Meta() {
170+
// if (mProtocol == support_proto_e::ProtoUnknown || IsL7MetaAttachReady()) {
171+
// return;
172+
// }
173+
174+
// mTags.Set<kProtocol>(std::string(magic_enum::enum_name(mProtocol)));
175+
// if (mRole == support_role_e::IsClient) {
176+
// mTags.SetNoCopy<kRpcType>(RPC_25_STR);
177+
// mTags.SetNoCopy<kCallKind>(HTTP_CLIENT_STR);
178+
// mTags.SetNoCopy<kCallType>(HTTP_CLIENT_STR);
179+
// MarkL7MetaAttached();
180+
// } else if (mRole == support_role_e::IsServer) {
181+
// mTags.SetNoCopy<kRpcType>(RPC_0_STR);
182+
// mTags.SetNoCopy<kCallKind>(HTTP_STR);
183+
// mTags.SetNoCopy<kCallType>(HTTP_STR);
184+
// MarkL7MetaAttached();
185+
// }
186+
// }
187+
188+
void Connection::TryAttachL7Meta(support_role_e role, support_proto_e protocol) {
189+
if (IsL7MetaAttachReady()) {
156190
return;
157191
}
158192

159-
mTags.Set<kProtocol>(std::string(magic_enum::enum_name(mProtocol)));
160-
if (mRole == support_role_e::IsClient) {
161-
mTags.SetNoCopy<kRpcType>(RPC_25_STR);
162-
mTags.SetNoCopy<kCallKind>(HTTP_CLIENT_STR);
163-
mTags.SetNoCopy<kCallType>(HTTP_CLIENT_STR);
164-
MarkL7MetaAttached();
165-
} else if (mRole == support_role_e::IsServer) {
166-
mTags.SetNoCopy<kRpcType>(RPC_0_STR);
167-
mTags.SetNoCopy<kCallKind>(HTTP_STR);
168-
mTags.SetNoCopy<kCallType>(HTTP_STR);
169-
MarkL7MetaAttached();
193+
// update role
194+
if (mRole == IsUnknown && role != IsUnknown) {
195+
mRole = role;
170196
}
171-
}
172197

173-
void Connection::UpdateNetMetaAttr(struct conn_stats_event_t* event) {
174-
if (IsL4MetaAttachReady()) {
175-
LOG_DEBUG(sLogger, ("netMeta already attached", ""));
176-
return;
198+
if (mProtocol == support_proto_e::ProtoUnknown && protocol != support_proto_e::ProtoUnknown) {
199+
mProtocol = protocol;
200+
mTags.Set<kProtocol>(std::string(magic_enum::enum_name(mProtocol)));
201+
}
202+
203+
LOG_INFO(sLogger,
204+
("protocol", magic_enum::enum_name(protocol))("role", magic_enum::enum_name(role))(
205+
"mprotocol", magic_enum::enum_name(mProtocol))("mrole", magic_enum::enum_name(mRole)));
206+
207+
if (mProtocol == support_proto_e::ProtoHTTP) {
208+
if (mRole == support_role_e::IsClient) {
209+
mTags.SetNoCopy<kRpcType>(RPC_25_STR);
210+
mTags.SetNoCopy<kCallKind>(HTTP_CLIENT_STR);
211+
mTags.SetNoCopy<kCallType>(HTTP_CLIENT_STR);
212+
MarkL7MetaAttached();
213+
} else if (mRole == support_role_e::IsServer) {
214+
mTags.SetNoCopy<kRpcType>(RPC_0_STR);
215+
mTags.SetNoCopy<kCallKind>(HTTP_STR);
216+
mTags.SetNoCopy<kCallType>(HTTP_STR);
217+
MarkL7MetaAttached();
218+
}
177219
}
178220

221+
// MarkL7MetaAttached();
222+
223+
// if (!IsL7MetaAttachReady()) {
224+
// // WriteLock lock(mProtocolAndRoleLock);
225+
// if (mRole != IsUnknown && mRole != role) {
226+
// LOG_WARNING(
227+
// sLogger,
228+
// ("role change!! last role", magic_enum::enum_name(mRole))("new role", magic_enum::enum_name(role)));
229+
// } else {
230+
// mRole = role;
231+
// TryAttachL7Meta();
232+
// }
233+
// }
234+
235+
// if (!IsL7MetaAttachReady()) {
236+
// // WriteLock lock(mProtocolAndRoleLock);
237+
// if (mProtocol != support_proto_e::ProtoUnknown && mProtocol != protocol) {
238+
// LOG_WARNING(sLogger,
239+
// ("protocol change!! last protocol",
240+
// magic_enum::enum_name(mProtocol))("new protocol", magic_enum::enum_name(protocol)));
241+
// } else {
242+
// mProtocol = protocol;
243+
// TryAttachL7Meta();
244+
// }
245+
// }
246+
}
247+
248+
void Connection::UpdateL4Meta(struct conn_stats_event_t* event) {
179249
MarkConnStatsEventReceived();
180250

181251
// handle container id ...
@@ -213,16 +283,15 @@ void Connection::UpdateNetMetaAttr(struct conn_stats_event_t* event) {
213283
mTags.Set<kIp>(sip);
214284
mTags.Set<kRemoteIp>(dip);
215285

216-
MarkL4MetaAttached();
217-
286+
// MarkL4MetaAttached();
218287

219288
// for peer meta
220-
LOG_DEBUG(sLogger, ("try attach peer meta", GetRemoteIp()));
221-
TryAttachPeerMeta(true, si.family, si.ap.daddr);
289+
// LOG_DEBUG(sLogger, ("try attach peer meta", GetRemoteIp()));
290+
// TryAttachPeerMeta(true, si.family, si.ap.daddr);
222291

223-
// for self meta
224-
LOG_DEBUG(sLogger, ("try attach self meta", GetContainerId()));
225-
TryAttachSelfMeta();
292+
// // for self meta
293+
// LOG_DEBUG(sLogger, ("try attach self meta", GetContainerId()));
294+
// TryAttachSelfMeta();
226295
}
227296

228297
void Connection::UpdateSelfPodMeta(const std::shared_ptr<K8sPodInfo>& pod) {

Diff for: core/ebpf/plugin/network_observer/Connection.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,13 @@ class Connection {
143143

144144
bool IsLocalhost() const;
145145

146+
void TryAttachL7Meta(support_role_e role, support_proto_e protocol);
146147
void TryAttachSelfMeta(bool enable = true);
147148
void TryAttachPeerMeta(bool enable = true, int family = -1, uint32_t ip = std::numeric_limits<uint32_t>::max());
148149

149-
void UpdateRole(enum support_role_e role);
150+
// void UpdateRole(enum support_role_e role);
150151

151-
void UpdateProtocol(support_proto_e protocol);
152+
// void UpdateProtocol(support_proto_e protocol);
152153

153154
bool GenerateConnStatsRecord(const std::shared_ptr<AbstractRecord>& record);
154155

@@ -161,8 +162,7 @@ class Connection {
161162
void MarkConnDeleted() { mMetaFlags.fetch_or(sFlagConnDeleted, std::memory_order_release); }
162163

163164
private:
164-
void UpdateNetMetaAttr(struct conn_stats_event_t* event);
165-
void TryUpdateProtocolAttr();
165+
void UpdateL4Meta(struct conn_stats_event_t* event);
166166
// peer pod meta
167167
void UpdatePeerPodMetaForExternal();
168168
void UpdatePeerPodMeta(const std::shared_ptr<K8sPodInfo>& pod);

Diff for: core/ebpf/plugin/network_observer/ConnectionManager.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ const std::shared_ptr<Connection> ConnectionManager::AcceptNetDataEvent(struct c
7878
return nullptr;
7979
}
8080

81-
conn->UpdateRole(event->role);
82-
conn->UpdateProtocol(event->protocol);
81+
// TryAttachL7
82+
// conn->UpdateRole(event->role);
83+
// conn->UpdateProtocol(event->protocol);
84+
conn->TryAttachL7Meta(event->role, event->protocol);
8385
conn->RecordActive();
8486
return conn;
8587
}

Diff for: core/ebpf/plugin/network_observer/NetworkObserverManager.cpp

+15-11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "collection_pipeline/queue/ProcessQueueManager.h"
2121
#include "common/HashUtil.h"
2222
#include "common/StringTools.h"
23+
#include "common/TimeUtil.h"
2324
#include "common/http/AsynCurlRunner.h"
2425
#include "common/magic_enum.hpp"
2526
#include "ebpf/Config.h"
@@ -495,9 +496,10 @@ bool NetworkObserverManager::ConsumeLogAggregateTree(const std::chrono::steady_c
495496
}
496497
// set time stamp
497498
HttpRecord* httpRecord = static_cast<HttpRecord*>(record);
498-
auto ts = httpRecord->GetStartTimeStamp() + mTimeDiff.count();
499-
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(ts));
500-
logEvent->SetTimestamp(seconds.count(), ts);
499+
auto timeSpec = KernelNanoTimeToUTC(httpRecord->GetStartTimeStamp());
500+
// auto ts = httpRecord->GetStartTimeStamp() + mTimeDiff.count();
501+
// auto seconds = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(ts));
502+
logEvent->SetTimestamp(timeSpec.tv_sec, timeSpec.tv_nsec);
501503
logEvent->SetContent(kLatencyNS.LogKey(), std::to_string(httpRecord->GetLatencyNs()));
502504
logEvent->SetContent(kHTTPMethod.LogKey(), httpRecord->GetMethod());
503505
logEvent->SetContent(kHTTPPath.LogKey(),
@@ -507,7 +509,7 @@ bool NetworkObserverManager::ConsumeLogAggregateTree(const std::chrono::steady_c
507509
logEvent->SetContent(kStatusCode.LogKey(), std::to_string(httpRecord->GetStatusCode()));
508510
logEvent->SetContent(kHTTPReqBody.LogKey(), httpRecord->GetReqBody());
509511
logEvent->SetContent(kHTTPRespBody.LogKey(), httpRecord->GetRespBody());
510-
LOG_DEBUG(sLogger, ("add one log, log timestamp", ts));
512+
LOG_DEBUG(sLogger, ("add one log, log timestamp", timeSpec.tv_sec)("nano", timeSpec.tv_nsec));
511513
needPush = true;
512514
}
513515
});
@@ -1009,8 +1011,8 @@ bool NetworkObserverManager::ConsumeSpanAggregateTree(
10091011
LOG_DEBUG(sLogger, ("record span tags", "")(std::string(kConnTrackerTable.ColSpanKey(i)), sb.data));
10101012
}
10111013

1012-
spanEvent->SetTraceId(record->mTraceId);
1013-
spanEvent->SetSpanId(record->mSpanId);
1014+
spanEvent->SetTraceId(FromRandom64ID(record->mTraceId));
1015+
spanEvent->SetSpanId(FromRandom64ID(record->mSpanId));
10141016
spanEvent->SetStatus(record->IsError() ? SpanEvent::StatusCode::Error : SpanEvent::StatusCode::Ok);
10151017
auto role = ct->GetRole();
10161018
if (role == support_role_e::IsClient) {
@@ -1504,12 +1506,13 @@ void NetworkObserverManager::ProcessRecord(const std::shared_ptr<AbstractRecord>
15041506
return;
15051507
}
15061508

1509+
if (appRecord->GetConnection()->IsConnDeleted()) {
1510+
// try attach again, for sake of connection is released in connection manager ...
1511+
appRecord->GetConnection()->TryAttachPeerMeta();
1512+
appRecord->GetConnection()->TryAttachSelfMeta();
1513+
}
1514+
15071515
if (!appRecord->GetConnection()->IsMetaAttachReadyForAppRecord()) {
1508-
if (appRecord->GetConnection()->IsConnDeleted()) {
1509-
// try attach again, for sake of connection is released in connection manager ...
1510-
appRecord->GetConnection()->TryAttachPeerMeta();
1511-
appRecord->GetConnection()->TryAttachSelfMeta();
1512-
}
15131516
// rollback
15141517
HandleRollback(record, isDrop);
15151518
if (isDrop) {
@@ -1668,6 +1671,7 @@ void NetworkObserverManager::AcceptDataEvent(struct conn_data_event_t* event) {
16681671
LOG_DEBUG(sLogger, ("begin parse, protocol is", std::string(magic_enum::enum_name(event->protocol))));
16691672

16701673
ReadLock lk(mSamplerLock);
1674+
// atomic shared_ptr
16711675
std::vector<std::shared_ptr<AbstractRecord>> records
16721676
= ProtocolParserManager::GetInstance().Parse(protocol, conn, event, mSampler);
16731677
lk.unlock();

0 commit comments

Comments
 (0)