Skip to content

Commit 055a87d

Browse files
committed
Wait for change works well if combined with tiny wait
1 parent 709b602 commit 055a87d

File tree

1 file changed

+39
-37
lines changed

1 file changed

+39
-37
lines changed

src/stream_writer.rs

+39-37
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,21 @@ pub struct StreamWriter {
77
pending: Arc<RwLock<Vec<u8>>>,
88
done: Arc<RwLock<bool>>,
99
// A way for the write side to signal new data to the stream side
10-
// ETA: WHICH DOESN'T WORK AND I DON'T KNOW WHY
11-
// write_index: Arc<RwLock<i64>>,
12-
// write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
13-
// write_index_receiver: tokio::sync::watch::Receiver<i64>,
10+
write_index: Arc<RwLock<i64>>,
11+
write_index_sender: Arc<tokio::sync::watch::Sender<i64>>,
12+
write_index_receiver: tokio::sync::watch::Receiver<i64>,
1413
}
1514

1615
impl StreamWriter {
1716
pub fn new() -> Self {
18-
// let write_index = 0;
19-
// let (tx, rx) = tokio::sync::watch::channel(write_index);
17+
let write_index = 0;
18+
let (tx, rx) = tokio::sync::watch::channel(write_index);
2019
Self {
2120
pending: Arc::new(RwLock::new(vec![])),
2221
done: Arc::new(RwLock::new(false)),
23-
// write_index: Arc::new(RwLock::new(write_index)),
24-
// write_index_sender: Arc::new(tx),
25-
// write_index_receiver: rx,
22+
write_index: Arc::new(RwLock::new(write_index)),
23+
write_index_sender: Arc::new(tx),
24+
write_index_receiver: rx,
2625
}
2726
}
2827

@@ -35,14 +34,12 @@ impl StreamWriter {
3534
Err(e) =>
3635
Err(anyhow::anyhow!("Internal error: StreamWriter::append can't take lock: {}", e))
3736
};
38-
// This was meant to wake up listener threads when there was new data but it ended up
39-
// just stalling until input was complete. TODO: investigate so we can get rid of the
40-
// duration-based polling.
41-
// {
42-
// let mut write_index = self.write_index.write().unwrap();
43-
// *write_index = *write_index + 1;
44-
// self.write_index_sender.send(*write_index).unwrap();
45-
// }
37+
{
38+
let mut write_index = self.write_index.write().unwrap();
39+
*write_index = *write_index + 1;
40+
self.write_index_sender.send(*write_index).unwrap();
41+
drop(write_index);
42+
}
4643
result
4744
}
4845

@@ -89,28 +86,33 @@ impl StreamWriter {
8986
if self.is_done() {
9087
return;
9188
} else {
92-
// Not sure how to do this better. I tried using a signal that data
93-
// had changed (via tokio::sync::watch::channel()), but that effectively
94-
// blocked - we got the first chunk quickly but then it stalled waiting
95-
// for the change notification. Polling is awful (and this interval is
96-
// probably too aggressive) but I don't know how to get signalling
97-
// to work!
98-
tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
99-
100-
// For the record: this is what I tried:
101-
// match self.write_index_receiver.changed().await {
102-
// Ok(_) => continue,
103-
// Err(e) => {
104-
// // If this ever happens (which it, cough, shouldn't), it means all senders have
105-
// // closed, which _should_ mean we are done. Log the error
106-
// // but don't return it to the stream: the response as streamed so far
107-
// // _should_ be okay!
108-
// tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
109-
// return;
110-
// }
111-
// }
89+
// This tiny wait seems to help the write-stream pipeline to flow more smmoothly.
90+
// If we go straight to the 'changed().await' then the pipeline seems to stall after
91+
// a few dozen writes, and everything else gets held up until the entire output
92+
// has been written. There may be better ways of doing this; I haven't found them
93+
// yet.
94+
//
95+
// (By the way, having the timer but not the change notification also worked. But if
96+
// writes came slowly, that would result in very aggressive polling. So hopefully this
97+
// gives us the best of both worlds.)
98+
tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await;
99+
100+
match self.write_index_receiver.changed().await {
101+
Ok(_) => continue,
102+
Err(e) => {
103+
// If this ever happens (which it, cough, shouldn't), it means all senders have
104+
// closed, which _should_ mean we are done. Log the error
105+
// but don't return it to the stream: the response as streamed so far
106+
// _should_ be okay!
107+
tracing::error!("StreamWriter::as_stream: error receiving write updates: {}", e);
108+
return;
109+
}
110+
}
112111
}
113112
} else {
113+
// This tiny wait seems to help the write-stream pipeline to flow more smmoothly.
114+
// See the comment on the 'empty buffer' case.
115+
tokio::time::sleep(tokio::time::Duration::from_nanos(10)).await;
114116
yield Ok(v);
115117
}
116118
},

0 commit comments

Comments
 (0)