|
17 | 17 |
|
18 | 18 | use rvnic::sys::RattanDesc; |
19 | 19 | use rvnic::{CompRing, FillRing, RATTAN_RING_SIZE, Rings, RvnicDevice, RxRing, TxRing, UmemBuilder}; |
| 20 | +use std::os::unix::io::AsRawFd; |
20 | 21 | use std::process::{Child, Command, Stdio}; |
21 | 22 | use std::sync::Arc; |
22 | 23 | use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; |
@@ -284,65 +285,105 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { |
284 | 285 | let packets_forwarded = Arc::new(AtomicU64::new(0)); |
285 | 286 | let packets_forwarded_clone = packets_forwarded.clone(); |
286 | 287 |
|
287 | | - // Two forwarding threads - one per direction |
288 | | - let running_clone2 = running.clone(); |
289 | | - let packets_forwarded_clone2 = packets_forwarded.clone(); |
| 288 | + // Get file descriptors for epoll |
| 289 | + let dev0_fd = dev0.as_raw_fd(); |
| 290 | + let dev1_fd = dev1.as_raw_fd(); |
290 | 291 |
|
291 | | - // Thread 1: rx0 → tx1, recycle comp1 → fill0 |
292 | | - let thread1 = std::thread::spawn(move || { |
| 292 | + // Single-threaded forwarder using epoll |
| 293 | + let forward_thread = std::thread::spawn(move || { |
293 | 294 | let mut descs = [RattanDesc::default(); BATCH_SIZE]; |
294 | 295 | let mut addrs = [0u64; BATCH_SIZE]; |
295 | 296 |
|
| 297 | + // Create epoll instance |
| 298 | + let epfd = unsafe { libc::epoll_create1(0) }; |
| 299 | + if epfd < 0 { |
| 300 | + eprintln!("epoll_create1 failed"); |
| 301 | + return (dev0, dev1); |
| 302 | + } |
| 303 | + |
| 304 | + // Add dev0 to epoll (data.u32 = 0) |
| 305 | + let mut ev = libc::epoll_event { |
| 306 | + events: libc::EPOLLIN as u32, |
| 307 | + u64: 0, |
| 308 | + }; |
| 309 | + if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, dev0_fd, &mut ev) } < 0 { |
| 310 | + eprintln!("epoll_ctl add dev0 failed"); |
| 311 | + unsafe { libc::close(epfd) }; |
| 312 | + return (dev0, dev1); |
| 313 | + } |
| 314 | + |
| 315 | + // Add dev1 to epoll (data.u32 = 1) |
| 316 | + ev.u64 = 1; |
| 317 | + if unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, dev1_fd, &mut ev) } < 0 { |
| 318 | + eprintln!("epoll_ctl add dev1 failed"); |
| 319 | + unsafe { libc::close(epfd) }; |
| 320 | + return (dev0, dev1); |
| 321 | + } |
| 322 | + |
| 323 | + let mut events = [libc::epoll_event { events: 0, u64: 0 }; 2]; |
| 324 | + |
296 | 325 | while running_clone.load(Ordering::Relaxed) { |
297 | | - // Forward a batch |
298 | | - let fwd = forward_batch(&mut rx0, &mut tx1, &mut descs); |
| 326 | + // Wait for events on either device (10ms timeout) |
| 327 | + let nfds = unsafe { libc::epoll_wait(epfd, events.as_mut_ptr(), 2, 10) }; |
299 | 328 |
|
300 | | - // Aggressively recycle all available chunks |
301 | | - loop { |
302 | | - let recycled = recycle_chunks(&mut comp1, &mut fill0, &mut addrs); |
303 | | - if recycled == 0 { |
| 329 | + if nfds < 0 { |
| 330 | + let err = std::io::Error::last_os_error(); |
| 331 | + if err.kind() != std::io::ErrorKind::Interrupted { |
| 332 | + eprintln!("epoll_wait failed: {}", err); |
304 | 333 | break; |
305 | 334 | } |
| 335 | + continue; |
306 | 336 | } |
307 | 337 |
|
308 | | - if fwd > 0 { |
309 | | - packets_forwarded_clone.fetch_add(fwd as u64, Ordering::Relaxed); |
310 | | - let _ = dev1.kick_rx(); |
311 | | - } else { |
312 | | - std::hint::spin_loop(); |
| 338 | + // Process events - drain RX rings completely |
| 339 | + for i in 0..nfds as usize { |
| 340 | + let dev_idx = events[i].u64; |
| 341 | + |
| 342 | + if dev_idx == 0 { |
| 343 | + // dev0 has RX data: forward rx0 → tx1 |
| 344 | + // Drain all available packets (not just one batch) |
| 345 | + loop { |
| 346 | + let fwd = forward_batch(&mut rx0, &mut tx1, &mut descs); |
| 347 | + if fwd > 0 { |
| 348 | + packets_forwarded_clone.fetch_add(fwd as u64, Ordering::Relaxed); |
| 349 | + let _ = dev1.kick_rx(); |
| 350 | + } |
| 351 | + if fwd < BATCH_SIZE { |
| 352 | + break; // Ring drained |
| 353 | + } |
| 354 | + } |
| 355 | + } else { |
| 356 | + // dev1 has RX data: forward rx1 → tx0 |
| 357 | + // Drain all available packets (not just one batch) |
| 358 | + loop { |
| 359 | + let fwd = forward_batch(&mut rx1, &mut tx0, &mut descs); |
| 360 | + if fwd > 0 { |
| 361 | + packets_forwarded_clone.fetch_add(fwd as u64, Ordering::Relaxed); |
| 362 | + let _ = dev0.kick_rx(); |
| 363 | + } |
| 364 | + if fwd < BATCH_SIZE { |
| 365 | + break; // Ring drained |
| 366 | + } |
| 367 | + } |
| 368 | + } |
313 | 369 | } |
314 | | - } |
315 | | - dev1 |
316 | | - }); |
317 | | - |
318 | | - // Thread 2: rx1 → tx0, recycle comp0 → fill1 |
319 | | - let thread2 = std::thread::spawn(move || { |
320 | | - let mut descs = [RattanDesc::default(); BATCH_SIZE]; |
321 | | - let mut addrs = [0u64; BATCH_SIZE]; |
322 | 370 |
|
323 | | - while running_clone2.load(Ordering::Relaxed) { |
324 | | - // Forward a batch |
325 | | - let fwd = forward_batch(&mut rx1, &mut tx0, &mut descs); |
326 | | - |
327 | | - // Aggressively recycle all available chunks |
| 371 | + // Always recycle chunks from both directions |
| 372 | + // This ensures chunks are returned promptly regardless of which device triggered |
328 | 373 | loop { |
329 | | - let recycled = recycle_chunks(&mut comp0, &mut fill1, &mut addrs); |
330 | | - if recycled == 0 { |
| 374 | + let r0 = recycle_chunks(&mut comp1, &mut fill0, &mut addrs); |
| 375 | + let r1 = recycle_chunks(&mut comp0, &mut fill1, &mut addrs); |
| 376 | + if r0 == 0 && r1 == 0 { |
331 | 377 | break; |
332 | 378 | } |
333 | 379 | } |
334 | | - |
335 | | - if fwd > 0 { |
336 | | - packets_forwarded_clone2.fetch_add(fwd as u64, Ordering::Relaxed); |
337 | | - let _ = dev0.kick_rx(); |
338 | | - } else { |
339 | | - std::hint::spin_loop(); |
340 | | - } |
341 | 380 | } |
342 | | - dev0 |
| 381 | + |
| 382 | + unsafe { libc::close(epfd) }; |
| 383 | + (dev0, dev1) |
343 | 384 | }); |
344 | 385 |
|
345 | | - println!(" Forwarding loop started"); |
| 386 | + println!(" Forwarding loop started (single thread, epoll)"); |
346 | 387 |
|
347 | 388 | // Step 5: Run iperf test |
348 | 389 | println!("\n5. Running iperf3 performance test..."); |
@@ -377,8 +418,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { |
377 | 418 | println!("\n7. Cleaning up..."); |
378 | 419 | running.store(false, Ordering::Relaxed); |
379 | 420 |
|
380 | | - let _dev1 = thread1.join().unwrap(); |
381 | | - let _dev0 = thread2.join().unwrap(); |
| 421 | + let (_dev0, _dev1) = forward_thread.join().unwrap(); |
382 | 422 |
|
383 | 423 | // netns_guard will clean up namespaces on drop |
384 | 424 | drop(netns_guard); |
|
0 commit comments