Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions rattan-core/src/cells/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl FlowMap {
}
}

fn get_id(&self, desc: FlowDesc, log_tx: &UnboundedSender<RattanLogOp>, base_ts: i64) -> u32 {
fn get_id(&self, desc: FlowDesc, log_tx: &UnboundedSender<RattanLogOp>) -> u32 {
{
let map = self.map.read();
if let Some(meta) = map.get(&desc) {
Expand All @@ -204,7 +204,7 @@ impl FlowMap {
let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
map.insert(desc.clone(), id);

let op = RattanLogOp::Flow(id, base_ts, desc);
let op = RattanLogOp::Flow(id, BASE_TS.1, desc);
if log_tx.send(op).is_err() {
cnt_log_op_error();
}
Expand Down Expand Up @@ -285,7 +285,7 @@ where
// Avoid doing so when packet log is not enabled.
if let (Some(&log_mode), Some(log_tx)) = (packet_log_mode, log_tx.as_ref()) {
if let Some(desc) = packet.flow_desc() {
let id = flow_map.get_id(desc, log_tx, base_ts);
let id = flow_map.get_id(desc, log_tx);
packet.set_flow_id(id);
}
log_packet(log_tx, &packet, PktAction::Recv, base_ts, log_mode);
Expand Down Expand Up @@ -435,7 +435,7 @@ fn log_packet<T: Packet>(
// tracing::debug!(target: "veth::egress::packet_log", "At {} veth {} recv pkt len {} desc {}", ts, id, p.length(), p.desc());
}

fn get_clock_ns() -> i64 {
pub fn get_clock_ns() -> i64 {
nix::time::clock_gettime(nix::time::ClockId::CLOCK_MONOTONIC)
.map(|ts| ts.tv_sec() * 1_000_000_000 + ts.tv_nsec())
.unwrap_or(0)
Expand Down Expand Up @@ -494,7 +494,7 @@ where
let driver = D::bind_cell(cell.clone())?;
let dev_senders = driver.iter().map(|d| d.sender()).collect();
let log_tx = LOGGING_TX.get().cloned();
let base_ts = *BASE_TS.get_or_init(get_clock_ns);
let base_ts = BASE_TS.0;
Ok(Self {
_cell: cell,
ingress: Arc::new(VirtualEthernetIngress::new(
Expand Down
17 changes: 14 additions & 3 deletions rattan-core/src/radix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use std::{
net::IpAddr,
sync::{mpsc, Arc},
thread,
time::{SystemTime, UNIX_EPOCH},
};

use backon::{BlockingRetryable, ExponentialBuilder};
use once_cell::sync::OnceCell;
use once_cell::sync::{Lazy, OnceCell};
use rattan_log::{file_logging_thread, RattanLogOp, LOGGING_TX};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;
Expand All @@ -17,7 +18,7 @@ use tracing::{debug, error, info, span, warn, Level};

use crate::{
cells::{
external::{VirtualEthernet, VirtualEthernetId},
external::{get_clock_ns, VirtualEthernet, VirtualEthernetId},
Cell, Packet,
},
config::{CellBuildConfig, RattanConfig},
Expand All @@ -34,7 +35,17 @@ use crate::{control::http::HttpControlEndpoint, error::HttpServerError};
use std::net::{Ipv4Addr, SocketAddr};

pub static INSTANCE_ID: OnceCell<String> = OnceCell::new();
pub static BASE_TS: OnceCell<i64> = OnceCell::new();

pub static BASE_TS: Lazy<(i64, u64)> = Lazy::new(|| {
// Internal use
let machine_time = get_clock_ns();
// Used as base timestamp in Packet Logs.
let unix_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_micros();
(machine_time, unix_time as u64)
});

#[derive(Clone, Copy, Debug, clap::ValueEnum, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand Down
6 changes: 3 additions & 3 deletions rattan-log/src/log_entry/entry/flow_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub struct TCPFlow {
pub dst_ip: u32,
pub src_port: u16,
pub dst_port: u16,
pub base_ts: i64,
pub base_ts: u64,
pub _reserved: u32,
pub options: TCPOption,
}
Expand Down Expand Up @@ -158,9 +158,9 @@ impl FlowEntryVariant {
}
}

impl From<(u32, i64, FlowDesc)> for FlowEntryVariant {
impl From<(u32, u64, FlowDesc)> for FlowEntryVariant {
// (flow_id, base_ts, flow_desc)
fn from(value: (u32, i64, FlowDesc)) -> Self {
fn from(value: (u32, u64, FlowDesc)) -> Self {
let (flow_id, base_ts, flow_desc) = value;
let mut entryheader = FlowEntryHeader::default();
entryheader.set_length(32);
Expand Down
11 changes: 5 additions & 6 deletions rattan-log/src/logger/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,11 @@ where
self.current_chunk_offset + current_chunk_len,
current_chunk_len
);
debug_assert_eq!(
self.prologue_len,
self.writer
.chunk
.write_at(&header, self.current_chunk_offset)
);

self.writer
.chunk
.write_at(&header, self.current_chunk_offset);

if is_final {
return;
}
Expand Down
4 changes: 2 additions & 2 deletions rattan-log/src/logger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub enum RattanLogOp {
/// its recorded position (represented as a byte range `[offset, offset + len)`).
/// We use this information (offset and len), along with the flow_id, to construct the raw log entry.
RawEntry(u32, RawLogEntry, Vec<u8>),
/// Represents a flow consists of (flow_id, base_time_ns, flow_desc).
Flow(u32, i64, FlowDesc),
/// Represents a flow consists of (flow_id, base_time_us, flow_desc).
Flow(u32, u64, FlowDesc),
/// End of Log
End,
}
8 changes: 3 additions & 5 deletions rattan-log/src/logger/pcap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl PacketWriter {
let enhanced_packet = EnhancedPacketBlock {
interface_id: 0,
timestamp: Duration::from_micros(
tcp_entry.general_pkt_entry.ts as u64 + flow_desc.base_ts as u64 / 1000,
tcp_entry.general_pkt_entry.ts as u64 + flow_desc.base_ts,
),
original_len: tcp_entry.general_pkt_entry.pkt_length as u32,
data: Cow::from(data),
Expand All @@ -165,13 +165,11 @@ impl PacketWriter {
&mut self,
raw_log_entry: &RawLogEntry,
packet: Vec<u8>,
base_ts: i64,
base_ts: u64,
) -> Result<()> {
let enhanced_packet = EnhancedPacketBlock {
interface_id: 0,
timestamp: Duration::from_micros(
raw_log_entry.general_pkt_entry.ts as u64 + base_ts as u64 / 1000,
),
timestamp: Duration::from_micros(raw_log_entry.general_pkt_entry.ts as u64 + base_ts),
original_len: raw_log_entry.general_pkt_entry.pkt_length as u32,
data: Cow::from(packet),
options: vec![],
Expand Down
9 changes: 8 additions & 1 deletion rattan-log/src/logger/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type FlowIndex = u16;
struct ParseContext {
pub flows: HashMap<FlowId, FlowEntryVariant>,
pub flow_index: HashMap<FlowIndex, FlowId>,
pub base_ts: i64,
pub base_ts: u64,
}

impl ParseContext {
Expand Down Expand Up @@ -221,6 +221,13 @@ pub fn convert_log_to_pcapng(
continue;
};

if raw_header[0] != 0x45 {
dbg!(offset);
dbg!(pointer.get_offset());
dbg!(chunk_offset);
panic!("Invalid IPv4 Header");
}

let (mut packet_header, mock_l3) =
match raw_entry.general_pkt_entry.header.get_type().try_into() {
Ok(GeneralPacketType::RawIP) => {
Expand Down
4 changes: 2 additions & 2 deletions rattan-log/src/logger/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ where
}
}

fn build_chunk_prologue(data_length: usize, time_offset: &Option<u64>) -> Vec<u8> {
fn build_chunk_prologue(data_length: usize, chunk_offset: &Option<u64>) -> Vec<u8> {
let header = new_log_entry_chunk_prologue(
time_offset.unwrap_or_default(),
chunk_offset.unwrap_or_default(),
LOGICAL_CHUNK_SIZE_1M,
data_length,
);
Expand Down
Loading