Skip to content

Commit 2cc7abe

Browse files
committed
WIP: add support for Arc<T> input
1 parent 1ac846b commit 2cc7abe

File tree

14 files changed

+433
-74
lines changed

14 files changed

+433
-74
lines changed

.idea/.gitignore

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/bus-queue.iml

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/async_publisher.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@ use futures_sink::Sink;
88
use std::pin::Pin;
99
use std::sync::Arc;
1010

11-
pub struct AsyncPublisher<T, S: SwapSlot<T>> {
12-
pub(super) publisher: Publisher<T, S>,
11+
pub struct AsyncPublisher<T, I, S: SwapSlot<T, I>> {
12+
pub(super) publisher: Publisher<T, I, S>,
1313
pub(super) event: Arc<Event>,
1414
}
1515

16-
impl<T, S: SwapSlot<T>> From<(Publisher<T, S>, Arc<Event>)> for AsyncPublisher<T, S> {
17-
fn from(input: (Publisher<T, S>, Arc<Event>)) -> Self {
16+
impl<T, I, S: SwapSlot<T, I>> From<(Publisher<T, I, S>, Arc<Event>)> for AsyncPublisher<T, I, S> {
17+
fn from(input: (Publisher<T, I, S>, Arc<Event>)) -> Self {
1818
Self {
1919
publisher: input.0,
2020
event: input.1,
2121
}
2222
}
2323
}
2424

25-
impl<T, S: SwapSlot<T>> Sink<T> for AsyncPublisher<T, S> {
26-
type Error = SendError<T>;
25+
impl<T, I, S: SwapSlot<T, I>> Sink<I> for AsyncPublisher<T, I, S> {
26+
type Error = SendError<I>;
2727

2828
fn poll_ready(
2929
self: Pin<&mut Self>,
@@ -32,7 +32,7 @@ impl<T, S: SwapSlot<T>> Sink<T> for AsyncPublisher<T, S> {
3232
Poll::Ready(Ok(()))
3333
}
3434

35-
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
35+
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
3636
self.publisher.broadcast(item).and_then(|_| Ok(()))
3737
}
3838

@@ -53,17 +53,17 @@ impl<T, S: SwapSlot<T>> Sink<T> for AsyncPublisher<T, S> {
5353
}
5454
}
5555

56-
impl<T, S: SwapSlot<T>> PartialEq for AsyncPublisher<T, S> {
57-
fn eq(&self, other: &AsyncPublisher<T, S>) -> bool {
56+
impl<T, I, S: SwapSlot<T, I>> PartialEq for AsyncPublisher<T, I, S> {
57+
fn eq(&self, other: &AsyncPublisher<T, I, S>) -> bool {
5858
self.publisher == other.publisher
5959
}
6060
}
6161

62-
impl<T, S: SwapSlot<T>> Drop for AsyncPublisher<T, S> {
62+
impl<T, I, S: SwapSlot<T, I>> Drop for AsyncPublisher<T, I, S> {
6363
fn drop(&mut self) {
6464
self.publisher.close();
6565
self.event.notify_all();
6666
}
6767
}
6868

69-
impl<T, S: SwapSlot<T>> Eq for AsyncPublisher<T, S> {}
69+
impl<T, I, S: SwapSlot<T, I>> Eq for AsyncPublisher<T, I, S> {}

src/async_subscriber.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ use futures_core::{
1111
use std::pin::Pin;
1212
use std::sync::Arc;
1313

14-
pub struct AsyncSubscriber<T, S: SwapSlot<T>> {
15-
pub(super) subscriber: Subscriber<T, S>,
14+
pub struct AsyncSubscriber<T, I, S: SwapSlot<T, I>> {
15+
pub(super) subscriber: Subscriber<T, I, S>,
1616
pub(super) event: Arc<Event>,
1717
pub(super) listener: Option<EventListener>,
1818
}
1919

20-
impl<T, S: SwapSlot<T>> From<(Subscriber<T, S>, Arc<Event>)> for AsyncSubscriber<T, S> {
21-
fn from(input: (Subscriber<T, S>, Arc<Event>)) -> Self {
20+
impl<T, I, S: SwapSlot<T, I>> From<(Subscriber<T, I, S>, Arc<Event>)> for AsyncSubscriber<T, I, S> {
21+
fn from(input: (Subscriber<T, I, S>, Arc<Event>)) -> Self {
2222
Self {
2323
subscriber: input.0,
2424
event: input.1,
@@ -27,13 +27,13 @@ impl<T, S: SwapSlot<T>> From<(Subscriber<T, S>, Arc<Event>)> for AsyncSubscriber
2727
}
2828
}
2929

30-
impl<T, S: SwapSlot<T>> std::fmt::Debug for AsyncSubscriber<T, S> {
30+
impl<T, I, S: SwapSlot<T, I>> std::fmt::Debug for AsyncSubscriber<T, I, S> {
3131
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3232
f.debug_struct("Subscriber").finish()
3333
}
3434
}
3535

36-
impl<T, S: SwapSlot<T>> AsyncSubscriber<T, S> {
36+
impl<T, I, S: SwapSlot<T, I>> AsyncSubscriber<T, I, S> {
3737
#[allow(dead_code)]
3838
pub fn set_skip_items(&mut self, skip_items: usize) {
3939
self.subscriber.set_skip_items(skip_items);
@@ -50,7 +50,7 @@ impl<T, S: SwapSlot<T>> AsyncSubscriber<T, S> {
5050
}
5151
}
5252

53-
impl<T, S: SwapSlot<T>> Stream for AsyncSubscriber<T, S> {
53+
impl<T, I, S: SwapSlot<T, I>> Stream for AsyncSubscriber<T, I, S> {
5454
type Item = Arc<T>;
5555

5656
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
@@ -91,7 +91,7 @@ impl<T, S: SwapSlot<T>> Stream for AsyncSubscriber<T, S> {
9191
}
9292
}
9393

94-
impl<T, S: SwapSlot<T>> Clone for AsyncSubscriber<T, S> {
94+
impl<T, I, S: SwapSlot<T, I>> Clone for AsyncSubscriber<T, I, S> {
9595
fn clone(&self) -> Self {
9696
Self {
9797
subscriber: self.subscriber.clone(),
@@ -101,10 +101,10 @@ impl<T, S: SwapSlot<T>> Clone for AsyncSubscriber<T, S> {
101101
}
102102
}
103103

104-
impl<T, S: SwapSlot<T>> PartialEq for AsyncSubscriber<T, S> {
105-
fn eq(&self, other: &AsyncSubscriber<T, S>) -> bool {
104+
impl<T, I, S: SwapSlot<T, I>> PartialEq for AsyncSubscriber<T, I, S> {
105+
fn eq(&self, other: &AsyncSubscriber<T, I, S>) -> bool {
106106
self.subscriber == other.subscriber
107107
}
108108
}
109109

110-
impl<T, S: SwapSlot<T>> Eq for AsyncSubscriber<T, S> {}
110+
impl<T, I, S: SwapSlot<T, I>> Eq for AsyncSubscriber<T, I, S> {}

src/flavors/arc_swap.rs

Lines changed: 124 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub struct Slot<T> {
77
shared: ArcSwapOption<T>,
88
}
99

10-
impl<T> SwapSlot<T> for Slot<T> {
10+
impl<T> SwapSlot<T, T> for Slot<T> {
1111
fn store(&self, item: T) {
1212
self.shared.store(Some(Arc::new(item)))
1313
}
@@ -23,36 +23,64 @@ impl<T> SwapSlot<T> for Slot<T> {
2323
}
2424
}
2525

26-
pub type Publisher<T> = publisher::Publisher<T, Slot<T>>;
27-
pub type Subscriber<T> = subscriber::Subscriber<T, Slot<T>>;
26+
impl<T> SwapSlot<T, Arc<T>> for Slot<T> {
27+
fn store(&self, item: Arc<T>) {
28+
self.shared.store(Some(item))
29+
}
30+
fn load(&self) -> Option<Arc<T>> {
31+
self.shared.load_full()
32+
}
33+
fn none() -> Self {
34+
Slot {
35+
shared: ArcSwapOption::new(None),
36+
}
37+
}
38+
}
39+
40+
pub type Publisher<T, I> = publisher::Publisher<T, I, Slot<T>>;
41+
pub type Subscriber<T, I> = subscriber::Subscriber<T, I, Slot<T>>;
42+
43+
pub fn bounded<T>(size: usize) -> (Publisher<T, T>, Subscriber<T, T>) {
44+
crate::bounded::<T, T, Slot<T>>(size)
45+
}
2846

29-
pub fn bounded<T>(size: usize) -> (Publisher<T>, Subscriber<T>) {
30-
crate::bounded::<T, Slot<T>>(size)
47+
pub fn bounded_arc<T>(size: usize) -> (Publisher<T, Arc<T>>, Subscriber<T, Arc<T>>) {
48+
crate::bounded::<T, Arc<T>, Slot<T>>(size)
3149
}
3250

33-
pub type AsyncPublisher<T> = async_publisher::AsyncPublisher<T, Slot<T>>;
34-
pub type AsyncSubscriber<T> = async_subscriber::AsyncSubscriber<T, Slot<T>>;
51+
pub type AsyncPublisher<T, I> = async_publisher::AsyncPublisher<T, I, Slot<T>>;
52+
pub type AsyncSubscriber<T, I> = async_subscriber::AsyncSubscriber<T, I, Slot<T>>;
3553

36-
pub fn async_bounded<T>(size: usize) -> (AsyncPublisher<T>, AsyncSubscriber<T>) {
37-
crate::async_bounded::<T, Slot<T>>(size)
54+
pub fn async_bounded<T>(size: usize) -> (AsyncPublisher<T, T>, AsyncSubscriber<T, T>) {
55+
crate::async_bounded::<T,T, Slot<T>>(size)
56+
}
57+
58+
pub fn async_bounded_arc<T>(size: usize) -> (AsyncPublisher<T,Arc<T>>, AsyncSubscriber<T, Arc<T>>) {
59+
crate::async_bounded::<T, Arc<T>, Slot<T>>(size)
3860
}
3961

4062
#[cfg(test)]
4163
mod test {
4264
use crate::flavors::arc_swap::Slot;
4365
use crate::swap_slot::SwapSlot;
4466
use std::sync::Arc;
67+
use std::hint::black_box;
68+
use std::ops::Deref;
69+
use std::sync::{
70+
atomic::{AtomicBool, Ordering}, };
71+
use std::thread;
72+
use std::time::{Duration, Instant};
4573

4674
#[test]
4775
fn test_archswap_none() {
48-
let slot: Slot<()> = Slot::none();
76+
let slot: Slot<usize> = SwapSlot::<usize, usize>::none();
4977

5078
assert_eq!(slot.shared.load_full(), None);
5179
}
5280

5381
#[test]
5482
fn test_archswap_store() {
55-
let slot = Slot::none();
83+
let slot: Slot<usize> = SwapSlot::<usize, usize>::none();
5684

5785
slot.store(5);
5886

@@ -61,12 +89,95 @@ mod test {
6189

6290
#[test]
6391
fn test_archswap_load() {
64-
let slot = Slot::none();
92+
let slot: Slot<i32> = SwapSlot::<i32, i32>::none();
6593
slot.store(10);
6694

67-
let arc = slot.load();
95+
let arc = SwapSlot::<i32, i32>::load(&slot);
6896

6997
assert_eq!(arc, Some(Arc::new(10)));
7098
assert_eq!(Arc::strong_count(&arc.unwrap()), 2)
7199
}
100+
#[test]
101+
fn test_slot_writer_vs_readers_latency() {
102+
// Make sure Slot<T> is accessible here (e.g., `pub struct Slot<T>(...)`).
103+
let slot: Slot<usize> = SwapSlot::<usize, usize>::none();
104+
let arc_slot: Arc<Slot<usize>> = Arc::new(slot);
105+
106+
// Stop flag for readers.
107+
let stop = Arc::new(AtomicBool::new(false));
108+
109+
// Spawn 31 busy readers.
110+
let mut reader_handles = Vec::with_capacity(31);
111+
for _ in 0..31 {
112+
let slot_cloned = Arc::clone(&arc_slot);
113+
let stop_cloned: Arc<AtomicBool> = Arc::clone(&stop);
114+
reader_handles.push(thread::spawn(move || {
115+
// Tight loop: continuously load from the slot.
116+
// We black_box the result to avoid the optimizer removing the load.
117+
let mut spins = 0u32;
118+
while !stop_cloned.load(Ordering::Relaxed) {
119+
let v = SwapSlot::<usize, usize>::load(slot_cloned.deref());
120+
black_box(v);
121+
122+
// Yield occasionally so we don't starve the writer completely.
123+
spins += 1;
124+
if spins % 10_000 == 0 {
125+
thread::yield_now();
126+
}
127+
}
128+
}));
129+
}
130+
131+
// Writer: measure latency of `store()` with many iterations.
132+
// Tune this for speed vs. stability. 200k is usually quick enough locally.
133+
let iterations: usize = 5_000_000;
134+
let mut lat_ns = Vec::with_capacity(iterations);
135+
136+
// Small warmup to populate the slot and let threads settle.
137+
for i in 0..10_000 {
138+
arc_slot.store(i);
139+
}
140+
141+
// Measure only the `store` call.
142+
for i in 0..iterations {
143+
let t0 = Instant::now();
144+
arc_slot.store(i);
145+
let dt = t0.elapsed();
146+
lat_ns.push(dt.as_nanos() as u64);
147+
148+
// Optional: tiny yield to reduce pathological contention plateaus.
149+
if i % 50_000 == 0 {
150+
thread::yield_now();
151+
}
152+
}
153+
154+
// Signal readers to stop and join them.
155+
stop.store(true, Ordering::Relaxed);
156+
for h in reader_handles {
157+
let _ = h.join();
158+
}
159+
160+
// Compute average and p95.
161+
let avg_ns = (lat_ns.iter().sum::<u64>() as f64) / (lat_ns.len() as f64);
162+
let mut sorted = lat_ns.clone();
163+
sorted.sort_unstable();
164+
let p95_ns = {
165+
let n = sorted.len();
166+
let rank = ((0.95_f64 * n as f64).ceil() as usize).clamp(1, n) - 1;
167+
sorted[rank]
168+
};
169+
170+
// Pretty-print in ns and µs.
171+
println!(
172+
"Slot::store latency over {} ops with 31 readers:\n avg = {:.2} ns ({:.3} µs)\n p95 = {} ns ({:.3} µs)",
173+
iterations,
174+
avg_ns,
175+
avg_ns / 1_000.0,
176+
p95_ns,
177+
(p95_ns as f64) / 1_000.0
178+
);
179+
180+
// A trivial assertion so the test is "about" the measurement but still passes.
181+
assert!(sorted.len() == iterations && avg_ns > 0.0);
182+
}
72183
}

0 commit comments

Comments
 (0)