Skip to content

Commit f9d9296

Browse files
committed
fix: issue #135
1 parent f7bb2e0 commit f9d9296

File tree

8 files changed

+42
-27
lines changed

8 files changed

+42
-27
lines changed

rattan-core/src/cells/external.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl FlowMap {
193193
}
194194
}
195195

196-
fn get_id(&self, desc: FlowDesc, log_tx: &UnboundedSender<RattanLogOp>, base_ts: i64) -> u32 {
196+
fn get_id(&self, desc: FlowDesc, log_tx: &UnboundedSender<RattanLogOp>) -> u32 {
197197
{
198198
let map = self.map.read();
199199
if let Some(meta) = map.get(&desc) {
@@ -204,7 +204,7 @@ impl FlowMap {
204204
let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
205205
map.insert(desc.clone(), id);
206206

207-
let op = RattanLogOp::Flow(id, base_ts, desc);
207+
let op = RattanLogOp::Flow(id, BASE_TS.1, desc);
208208
if log_tx.send(op).is_err() {
209209
cnt_log_op_error();
210210
}
@@ -285,7 +285,7 @@ where
285285
// Avoid doing so when packet log is not enabled.
286286
if let (Some(&log_mode), Some(log_tx)) = (packet_log_mode, log_tx.as_ref()) {
287287
if let Some(desc) = packet.flow_desc() {
288-
let id = flow_map.get_id(desc, log_tx, base_ts);
288+
let id = flow_map.get_id(desc, log_tx);
289289
packet.set_flow_id(id);
290290
}
291291
log_packet(log_tx, &packet, PktAction::Recv, base_ts, log_mode);
@@ -435,7 +435,7 @@ fn log_packet<T: Packet>(
435435
// tracing::debug!(target: "veth::egress::packet_log", "At {} veth {} recv pkt len {} desc {}", ts, id, p.length(), p.desc());
436436
}
437437

438-
fn get_clock_ns() -> i64 {
438+
pub fn get_clock_ns() -> i64 {
439439
nix::time::clock_gettime(nix::time::ClockId::CLOCK_MONOTONIC)
440440
.map(|ts| ts.tv_sec() * 1_000_000_000 + ts.tv_nsec())
441441
.unwrap_or(0)
@@ -494,7 +494,7 @@ where
494494
let driver = D::bind_cell(cell.clone())?;
495495
let dev_senders = driver.iter().map(|d| d.sender()).collect();
496496
let log_tx = LOGGING_TX.get().cloned();
497-
let base_ts = *BASE_TS.get_or_init(get_clock_ns);
497+
let base_ts = BASE_TS.0;
498498
Ok(Self {
499499
_cell: cell,
500500
ingress: Arc::new(VirtualEthernetIngress::new(

rattan-core/src/radix/mod.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ use std::{
22
net::IpAddr,
33
sync::{mpsc, Arc},
44
thread,
5+
time::{SystemTime, UNIX_EPOCH},
56
};
67

78
use backon::{BlockingRetryable, ExponentialBuilder};
8-
use once_cell::sync::OnceCell;
9+
use once_cell::sync::{Lazy, OnceCell};
910
use rattan_log::{file_logging_thread, RattanLogOp, LOGGING_TX};
1011
use tokio::runtime::Runtime;
1112
use tokio_util::sync::CancellationToken;
@@ -17,7 +18,7 @@ use tracing::{debug, error, info, span, warn, Level};
1718

1819
use crate::{
1920
cells::{
20-
external::{VirtualEthernet, VirtualEthernetId},
21+
external::{get_clock_ns, VirtualEthernet, VirtualEthernetId},
2122
Cell, Packet,
2223
},
2324
config::{CellBuildConfig, RattanConfig},
@@ -34,7 +35,17 @@ use crate::{control::http::HttpControlEndpoint, error::HttpServerError};
3435
use std::net::{Ipv4Addr, SocketAddr};
3536

3637
pub static INSTANCE_ID: OnceCell<String> = OnceCell::new();
37-
pub static BASE_TS: OnceCell<i64> = OnceCell::new();
38+
39+
pub static BASE_TS: Lazy<(i64, u64)> = Lazy::new(|| {
40+
// Internal use
41+
let machine_time = get_clock_ns();
42+
// Used as base timestamp in Packet Logs.
43+
let unix_time = SystemTime::now()
44+
.duration_since(UNIX_EPOCH)
45+
.expect("Time went backwards")
46+
.as_micros();
47+
(machine_time, unix_time as u64)
48+
});
3849

3950
#[derive(Clone, Copy, Debug, clap::ValueEnum, PartialEq, Eq)]
4051
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]

rattan-log/src/log_entry/entry/flow_entry.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub struct TCPFlow {
7979
pub dst_ip: u32,
8080
pub src_port: u16,
8181
pub dst_port: u16,
82-
pub base_ts: i64,
82+
pub base_ts: u64,
8383
pub _reserved: u32,
8484
pub options: TCPOption,
8585
}
@@ -158,9 +158,9 @@ impl FlowEntryVariant {
158158
}
159159
}
160160

161-
impl From<(u32, i64, FlowDesc)> for FlowEntryVariant {
161+
impl From<(u32, u64, FlowDesc)> for FlowEntryVariant {
162162
// (flow_id, base_ts, flow_desc)
163-
fn from(value: (u32, i64, FlowDesc)) -> Self {
163+
fn from(value: (u32, u64, FlowDesc)) -> Self {
164164
let (flow_id, base_ts, flow_desc) = value;
165165
let mut entryheader = FlowEntryHeader::default();
166166
entryheader.set_length(32);

rattan-log/src/logger/mmap.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,11 @@ where
193193
self.current_chunk_offset + current_chunk_len,
194194
current_chunk_len
195195
);
196-
debug_assert_eq!(
197-
self.prologue_len,
198-
self.writer
199-
.chunk
200-
.write_at(&header, self.current_chunk_offset)
201-
);
196+
197+
self.writer
198+
.chunk
199+
.write_at(&header, self.current_chunk_offset);
200+
202201
if is_final {
203202
return;
204203
}

rattan-log/src/logger/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ pub enum RattanLogOp {
5656
/// its recorded position (represented as a byte range `[offset, offset + len)`).
5757
/// We use this information (offset and len), along with the flow_id, to construct the raw log entry.
5858
RawEntry(u32, RawLogEntry, Vec<u8>),
59-
/// Represents a flow consists of (flow_id, base_time_ns, flow_desc).
60-
Flow(u32, i64, FlowDesc),
59+
/// Represents a flow consists of (flow_id, base_time_us, flow_desc).
60+
Flow(u32, u64, FlowDesc),
6161
/// End of Log
6262
End,
6363
}

rattan-log/src/logger/pcap.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl PacketWriter {
147147
let enhanced_packet = EnhancedPacketBlock {
148148
interface_id: 0,
149149
timestamp: Duration::from_micros(
150-
tcp_entry.general_pkt_entry.ts as u64 + flow_desc.base_ts as u64 / 1000,
150+
tcp_entry.general_pkt_entry.ts as u64 + flow_desc.base_ts,
151151
),
152152
original_len: tcp_entry.general_pkt_entry.pkt_length as u32,
153153
data: Cow::from(data),
@@ -165,13 +165,11 @@ impl PacketWriter {
165165
&mut self,
166166
raw_log_entry: &RawLogEntry,
167167
packet: Vec<u8>,
168-
base_ts: i64,
168+
base_ts: u64,
169169
) -> Result<()> {
170170
let enhanced_packet = EnhancedPacketBlock {
171171
interface_id: 0,
172-
timestamp: Duration::from_micros(
173-
raw_log_entry.general_pkt_entry.ts as u64 + base_ts as u64 / 1000,
174-
),
172+
timestamp: Duration::from_micros(raw_log_entry.general_pkt_entry.ts as u64 + base_ts),
175173
original_len: raw_log_entry.general_pkt_entry.pkt_length as u32,
176174
data: Cow::from(packet),
177175
options: vec![],

rattan-log/src/logger/reader.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type FlowIndex = u16;
2626
struct ParseContext {
2727
pub flows: HashMap<FlowId, FlowEntryVariant>,
2828
pub flow_index: HashMap<FlowIndex, FlowId>,
29-
pub base_ts: i64,
29+
pub base_ts: u64,
3030
}
3131

3232
impl ParseContext {
@@ -221,6 +221,13 @@ pub fn convert_log_to_pcapng(
221221
continue;
222222
};
223223

224+
if raw_header[0] != 0x45 {
225+
dbg!(offset);
226+
dbg!(pointer.get_offset());
227+
dbg!(chunk_offset);
228+
panic!("Invalid IPv4 Header");
229+
}
230+
224231
let (mut packet_header, mock_l3) =
225232
match raw_entry.general_pkt_entry.header.get_type().try_into() {
226233
Ok(GeneralPacketType::RawIP) => {

rattan-log/src/logger/writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ where
149149
}
150150
}
151151

152-
fn build_chunk_prologue(data_length: usize, time_offset: &Option<u64>) -> Vec<u8> {
152+
fn build_chunk_prologue(data_length: usize, chunk_offset: &Option<u64>) -> Vec<u8> {
153153
let header = new_log_entry_chunk_prologue(
154-
time_offset.unwrap_or_default(),
154+
chunk_offset.unwrap_or_default(),
155155
LOGICAL_CHUNK_SIZE_1M,
156156
data_length,
157157
);

0 commit comments

Comments
 (0)