Skip to content

Commit 3efe03d

Browse files
committed
refactor: optimize l7 perf cache
1 parent ac5caca commit 3efe03d

File tree

6 files changed

+129
-70
lines changed

6 files changed

+129
-70
lines changed

agent/benches/lru.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use std::{net::Ipv4Addr, time::Instant};
2121

2222
use criterion::*;
2323
use deepflow_agent::{
24-
_L7PerfCache as L7PerfCache, _LogCache as LogCache, _LogMessageType as LogMessageType,
24+
_L7PerfCache as L7PerfCache, _LogCache as LogCache, _LogCacheKey as LogCacheKey,
25+
_LogMessageType as LogMessageType,
2526
};
2627
use lru::LruCache;
2728
use rand::prelude::*;
@@ -430,7 +431,9 @@ fn rrt_lru(c: &mut Criterion) {
430431
let start = Instant::now();
431432
for item in seeds {
432433
cache.rrt_cache.put(
433-
((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128,
434+
LogCacheKey(
435+
((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128,
436+
),
434437
LogCache {
435438
msg_type: LogMessageType::Request,
436439
time: item.duration.as_micros() as u64,
@@ -456,7 +459,9 @@ fn rrt_lru(c: &mut Criterion) {
456459
let mut cache = L7PerfCache::new(1000);
457460
for item in &seeds {
458461
cache.rrt_cache.put(
459-
((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128,
462+
LogCacheKey(
463+
((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128,
464+
),
460465
LogCache {
461466
msg_type: LogMessageType::Request,
462467
time: item.duration.as_micros() as u64,
@@ -466,9 +471,9 @@ fn rrt_lru(c: &mut Criterion) {
466471
}
467472
let start = Instant::now();
468473
for item in &seeds {
469-
cache.rrt_cache.get(
470-
&(((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128),
471-
);
474+
cache.rrt_cache.get(&LogCacheKey(
475+
((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128,
476+
));
472477
}
473478
start.elapsed()
474479
})
@@ -488,7 +493,9 @@ fn rrt_lru(c: &mut Criterion) {
488493
let mut cache = L7PerfCache::new(1000);
489494
for item in &seeds {
490495
cache.rrt_cache.put(
491-
((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128,
496+
LogCacheKey(
497+
((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128,
498+
),
492499
LogCache {
493500
msg_type: LogMessageType::Request,
494501
time: item.duration.as_micros() as u64,
@@ -498,9 +505,9 @@ fn rrt_lru(c: &mut Criterion) {
498505
}
499506
let start = Instant::now();
500507
for item in &seeds {
501-
cache.rrt_cache.get(
502-
&(((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128),
503-
);
508+
cache.rrt_cache.get(&LogCacheKey(
509+
((item.flow_id as u128) << 64) | item.stream_id.unwrap_or_default() as u128,
510+
));
504511
}
505512
start.elapsed()
506513
})

agent/src/common/l7_protocol_info.rs

Lines changed: 12 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use log::{debug, error, warn};
2222
use serde::Serialize;
2323

2424
use crate::{
25-
common::l7_protocol_log::LogCache,
25+
common::l7_protocol_log::{LogCache, LogCacheKey},
2626
flow_generator::{
2727
protocol_logs::{
2828
fastcgi::FastCGIInfo, pb_adapter::L7ProtocolSendLog, AmqpInfo, BrpcInfo, DnsInfo,
@@ -35,7 +35,7 @@ use crate::{
3535
plugin::CustomInfo,
3636
};
3737

38-
use super::{ebpf::EbpfType, l7_protocol_log::ParseParam};
38+
use super::l7_protocol_log::ParseParam;
3939

4040
macro_rules! all_protocol_info {
4141
($($name:ident($info_struct:ident)),+$(,)?) => {
@@ -133,35 +133,6 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
133133
(false, false)
134134
}
135135

136-
fn cal_cache_key(&self, param: &ParseParam) -> u128 {
137-
/*
138-
if session id is some: flow id 64bit | 0 32bit | session id 32bit
139-
if session id is none: flow id 64bit | packet_seq 64bit
140-
*/
141-
match self.session_id() {
142-
Some(sid) => ((param.flow_id as u128) << 64) | sid as u128,
143-
None => {
144-
((param.flow_id as u128) << 64)
145-
| (if param.ebpf_type != EbpfType::None {
146-
// NOTE:
147-
// In the request-log session aggregation process, for eBPF data, we require that requests and
148-
// responses have consecutive cap_seq to ensure the correctness of session aggregation. However,
149-
// when SR (Segmentation-Reassembly) is enabled, we combine multiple eBPF socket event events
150-
// before parsing the protocol. Therefore, in order to ensure that session aggregation can still
151-
// be performed correctly, we need to retain the cap_seq of the last request and the cap_seq of
152-
// the first response, so that the cap_seq of the request and response can still be consecutive.
153-
if param.direction == PacketDirection::ClientToServer {
154-
param.packet_end_seq + 1
155-
} else {
156-
param.packet_start_seq
157-
}
158-
} else {
159-
0
160-
}) as u128
161-
}
162-
}
163-
}
164-
165136
/*
166137
calculate rrt
167138
if have previous log cache:
@@ -176,17 +147,15 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
176147
*/
177148
fn cal_rrt(&self, param: &ParseParam) -> Option<u64> {
178149
let mut perf_cache = param.l7_perf_cache.borrow_mut();
179-
let cache_key = self.cal_cache_key(param);
180-
let previous_log_info = perf_cache.rrt_cache.pop(&cache_key);
150+
let cache_key = LogCacheKey::new(param, self.session_id());
151+
let previous_log_info = perf_cache.pop(cache_key);
181152

182153
let time = param.time;
183154
let msg_type: LogMessageType = param.direction.into();
184155
let timeout = param.rrt_timeout as u64;
185156

186157
if time != 0 {
187-
let (in_cached_req, timeout_count) = perf_cache
188-
.timeout_cache
189-
.get_or_insert_mut(param.flow_id, || (0, 0));
158+
let (in_cached_req, timeout_count) = perf_cache.get_or_insert_mut(param.flow_id);
190159

191160
let Some(previous_log_info) = previous_log_info else {
192161
if msg_type == LogMessageType::Request {
@@ -225,19 +194,10 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
225194
// timeout, save the latest
226195
if rrt > timeout {
227196
*timeout_count += 1;
228-
perf_cache.rrt_cache.put(
229-
cache_key,
230-
LogCache {
231-
msg_type: param.direction.into(),
232-
time: param.time,
233-
multi_merge_info: None,
234-
},
235-
);
236197
None
237198
} else {
238199
Some(rrt)
239200
}
240-
241201
// if previous is resp and current is req and previous time gt current time, likely ebpf disorder,
242202
// calculate the round trip time.
243203
} else if previous_log_info.msg_type == LogMessageType::Response
@@ -248,9 +208,7 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
248208
if rrt > timeout {
249209
// disorder info rrt unlikely have large rrt gt timeout
250210
warn!("l7 log info disorder with long time rrt {}", rrt);
251-
// timeout, save latest
252211
*timeout_count += 1;
253-
perf_cache.rrt_cache.put(cache_key, previous_log_info);
254212
None
255213
} else {
256214
Some(rrt)
@@ -269,15 +227,15 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
269227
if previous_log_info.msg_type == LogMessageType::Request {
270228
*in_cached_req += 1;
271229
}
272-
perf_cache.rrt_cache.put(cache_key, previous_log_info);
230+
perf_cache.put(cache_key, previous_log_info);
273231
} else {
274232
if previous_log_info.msg_type == LogMessageType::Request {
275233
*timeout_count += 1;
276234
}
277235
if msg_type == LogMessageType::Request {
278236
*in_cached_req += 1;
279237
}
280-
perf_cache.rrt_cache.put(
238+
perf_cache.put(
281239
cache_key,
282240
LogCache {
283241
msg_type: param.direction.into(),
@@ -307,8 +265,8 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
307265
assert!(self.session_id().is_some());
308266

309267
let mut perf_cache = param.l7_perf_cache.borrow_mut();
310-
let cache_key = self.cal_cache_key(param);
311-
let previous_log_info = perf_cache.rrt_cache.pop(&cache_key);
268+
let cache_key = LogCacheKey::new(param, self.session_id());
269+
let previous_log_info = perf_cache.pop(cache_key);
312270

313271
let time = param.time;
314272
let msg_type: LogMessageType = param.direction.into();
@@ -319,9 +277,7 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
319277
return None;
320278
}
321279

322-
let (in_cached_req, timeout_count) = perf_cache
323-
.timeout_cache
324-
.get_or_insert_mut(param.flow_id, || (0, 0));
280+
let (in_cached_req, timeout_count) = perf_cache.get_or_insert_mut(param.flow_id);
325281

326282
let (req_end, resp_end) = {
327283
let (req_end, resp_end) = self.is_req_resp_end();
@@ -407,15 +363,15 @@ pub trait L7ProtocolInfoInterface: Into<L7ProtocolSendLog> {
407363
};
408364

409365
if put_back {
410-
perf_cache.rrt_cache.put(cache_key, previous_log_info);
366+
perf_cache.put(cache_key, previous_log_info);
411367
}
412368
r
413369
} else {
414370
if previous_log_info.msg_type != msg_type && !merged {
415371
*timeout_count += 1;
416372
}
417373
if put_back {
418-
perf_cache.rrt_cache.put(cache_key, previous_log_info);
374+
perf_cache.put(cache_key, previous_log_info);
419375
}
420376
None
421377
}

agent/src/common/l7_protocol_log.rs

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,38 +299,116 @@ pub struct LogCache {
299299
pub multi_merge_info: Option<(bool, bool, bool)>,
300300
}
301301

302+
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
303+
pub struct LogCacheKey(pub u128);
304+
305+
impl LogCacheKey {
306+
pub fn new(param: &ParseParam, session_id: Option<u32>) -> Self {
307+
/*
308+
if session id is some: flow id 64bit | 0 32bit | session id 32bit
309+
if session id is none: flow id 64bit | packet_seq 64bit
310+
*/
311+
let key = match session_id {
312+
Some(sid) => ((param.flow_id as u128) << 64) | sid as u128,
313+
None => {
314+
((param.flow_id as u128) << 64)
315+
| (if param.ebpf_type != EbpfType::None {
316+
// NOTE:
317+
// In the request-log session aggregation process, for eBPF data, we require that requests and
318+
// responses have consecutive cap_seq to ensure the correctness of session aggregation. However,
319+
// when SR (Segmentation-Reassembly) is enabled, we combine multiple eBPF socket event events
320+
// before parsing the protocol. Therefore, in order to ensure that session aggregation can still
321+
// be performed correctly, we need to retain the cap_seq of the last request and the cap_seq of
322+
// the first response, so that the cap_seq of the request and response can still be consecutive.
323+
if param.direction == PacketDirection::ClientToServer {
324+
param.packet_end_seq + 1
325+
} else {
326+
param.packet_start_seq
327+
}
328+
} else {
329+
0
330+
}) as u128
331+
}
332+
};
333+
334+
Self(key)
335+
}
336+
337+
fn flow_id(&self) -> u64 {
338+
(self.0 >> 64) as u64
339+
}
340+
}
341+
302342
pub struct L7PerfCache {
303343
// lru cache previous rrt
304-
pub rrt_cache: LruCache<u128, LogCache>,
344+
pub rrt_cache: LruCache<LogCacheKey, LogCache>,
305345
// LruCache<flow_id, (in_cache_req, count)>
306346
pub timeout_cache: LruCache<u64, (usize, usize)>,
347+
// LruCache<flow_id, LruCache<LogCacheKey, bool>>
348+
pub flow_id_map: LruCache<u64, LruCache<LogCacheKey, bool>>,
307349
// time in microseconds
308350
pub last_log_time: u64,
309351
}
310352

311353
impl L7PerfCache {
312354
// 60 seconds
313355
const LOG_INTERVAL: u64 = 60_000_000;
356+
// When the number of concurrent transactions exceeds this value, the RRT calculation error will occur.
357+
const MAX_RRT_CACHE_PER_FLOW: usize = 128;
314358

315359
pub fn new(cap: usize) -> Self {
316360
L7PerfCache {
317361
rrt_cache: LruCache::new(cap.try_into().unwrap()),
318362
timeout_cache: LruCache::new(cap.try_into().unwrap()),
363+
flow_id_map: LruCache::new(cap.try_into().unwrap()),
319364
last_log_time: 0,
320365
}
321366
}
322367

323-
pub fn put(&mut self, key: u128, value: LogCache) -> Option<LogCache> {
368+
pub fn put(&mut self, key: LogCacheKey, value: LogCache) -> Option<LogCache> {
324369
let now = value.time;
325370
if self.rrt_cache.len() >= usize::from(self.rrt_cache.cap())
326371
&& self.last_log_time + Self::LOG_INTERVAL < now
327372
{
328373
self.last_log_time = now;
329374
debug!("The capacity({}) of the rrt table will be exceeded. please adjust the configuration", self.rrt_cache.cap());
330375
}
376+
if let Some((old, _)) = self
377+
.flow_id_map
378+
.get_or_insert_mut(key.flow_id(), || {
379+
let mut cache = LruCache::new(Self::MAX_RRT_CACHE_PER_FLOW.try_into().unwrap());
380+
cache.put(key.clone(), true);
381+
cache
382+
})
383+
.push(key.clone(), true)
384+
{
385+
// Another cache entry is removed due to the lru's capacity.
386+
if key != old {
387+
self.rrt_cache.pop(&old);
388+
if self.last_log_time + Self::LOG_INTERVAL < now {
389+
self.last_log_time = now;
390+
debug!(
391+
"The capacity({}) of the flow id table will be exceeded, flow id: {}",
392+
Self::MAX_RRT_CACHE_PER_FLOW,
393+
old.flow_id()
394+
);
395+
}
396+
}
397+
}
331398
self.rrt_cache.put(key, value)
332399
}
333400

401+
pub fn pop(&mut self, key: LogCacheKey) -> Option<LogCache> {
402+
if let Some(cache) = self.flow_id_map.get_mut(&key.flow_id()) {
403+
cache.pop(&key);
404+
405+
if cache.is_empty() {
406+
self.flow_id_map.pop(&key.flow_id());
407+
}
408+
}
409+
self.rrt_cache.pop(&key)
410+
}
411+
334412
pub fn pop_timeout_count(&mut self, flow_id: &u64, flow_end: bool) -> usize {
335413
let (in_cache, t) = self.timeout_cache.pop(flow_id).unwrap_or((0, 0));
336414
if flow_end {
@@ -340,6 +418,22 @@ impl L7PerfCache {
340418
t
341419
}
342420
}
421+
422+
pub fn get_or_insert_mut(&mut self, flow_id: u64) -> &mut (usize, usize) {
423+
self.timeout_cache.get_or_insert_mut(flow_id, || (0, 0))
424+
}
425+
426+
pub fn remove(&mut self, flow_id: &u64) {
427+
self.timeout_cache.pop(flow_id);
428+
429+
let Some(keys) = self.flow_id_map.pop(flow_id) else {
430+
return;
431+
};
432+
433+
for (key, _) in keys {
434+
self.rrt_cache.pop(&key);
435+
}
436+
}
343437
}
344438

345439
pub struct ParseParam<'a> {

agent/src/flow_generator/flow_map.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2061,6 +2061,7 @@ impl FlowMap {
20612061
collect_stats = true;
20622062
}
20632063
}
2064+
self.perf_cache.borrow_mut().remove(&flow.flow_id);
20642065

20652066
self.size -= 1;
20662067
self.stats_counter

agent/src/flow_generator/protocol_logs/sql/mysql.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,8 @@ impl L7ProtocolParserInterface for MysqlLog {
444444
}
445445
_ => {}
446446
}
447-
if info.msg_type != LogMessageType::Session {
447+
if info.msg_type == LogMessageType::Request || info.msg_type == LogMessageType::Response
448+
{
448449
info.cal_rrt(param).map(|rrt| {
449450
info.rrt = rrt;
450451
self.perf_stats.as_mut().map(|p| p.update_rrt(rrt));

0 commit comments

Comments
 (0)