Skip to content

Commit 70cd0ac

Browse files
committed
finalized tokio update in io
1 parent 9b82edd commit 70cd0ac

File tree

2 files changed

+40
-75
lines changed

2 files changed

+40
-75
lines changed

crates/runtime/io/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ log = "0.4"
1616
slab = "0.4"
1717
timer = "0.2"
1818
time = "0.1"
19-
tokio = "0.1"
19+
tokio = "1.46"
2020
futures = "0.1"

crates/runtime/io/src/worker.rs

Lines changed: 39 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,17 @@ use crate::{
1919
service_mio::{HandlerId, IoChannel, IoContext},
2020
};
2121
use deque;
22-
use futures::{future::{self, FutureResult, Loop}, Future};
22+
use futures::{
23+
Future,
24+
future::{self, FutureResult, Loop},
25+
};
2326
use std::{
24-
io::{Error, ErrorKind}, sync::{
25-
atomic::{AtomicBool, Ordering as AtomicOrdering}, Arc
26-
}, thread::{self, JoinHandle}
27+
io::{Error, ErrorKind},
28+
sync::{
29+
Arc,
30+
atomic::{AtomicBool, Ordering as AtomicOrdering},
31+
},
32+
thread::{self, JoinHandle},
2733
};
2834
use tokio::{self};
2935

@@ -55,24 +61,21 @@ pub struct Worker {
5561
wait_mutex: Arc<Mutex<()>>,
5662
}
5763

58-
59-
struct WorkerContext<Message>
64+
struct WorkerContext<Message>
6065
where
61-
Message: Send + Sync + 'static,
66+
Message: Send + Sync + 'static,
6267
{
63-
stealer: deque::Stealer<Work<Message>>,
64-
channel: IoChannel<Message>,
65-
wait: Arc<Condvar>,
66-
wait_mutex: Arc<Mutex<()>>,
67-
deleting: Arc<AtomicBool>,
68+
stealer: deque::Stealer<Work<Message>>,
69+
channel: IoChannel<Message>,
70+
wait: Arc<Condvar>,
71+
wait_mutex: Arc<Mutex<()>>,
72+
deleting: Arc<AtomicBool>,
6873
}
6974

70-
impl<Message> WorkerContext<Message>
75+
impl<Message> WorkerContext<Message>
7176
where
72-
Message: Send + Sync + 'static,
77+
Message: Send + Sync + 'static,
7378
{
74-
75-
7679
pub fn new(
7780
stealer: deque::Stealer<Work<Message>>,
7881
channel: IoChannel<Message>,
@@ -89,33 +92,28 @@ Message: Send + Sync + 'static,
8992
}
9093
}
9194

92-
93-
fn execute(self) -> Loop<(), Self> {
94-
95+
fn execute(self) -> future::FutureResult<(Self, bool), ()> {
9596
{
96-
let mut lock = self.wait_mutex.lock();
9797
if self.deleting.load(AtomicOrdering::SeqCst) {
98-
return Loop::Break(());// futures::future::err(Error::new(ErrorKind::Other, "shutting down worker")); // self; // Ok(Loop::Break(self));
98+
return future::ok((self, false)); // Loop::Break(());// futures::future::err(Error::new(ErrorKind::Other, "shutting down worker")); // self; // Ok(Loop::Break(self));
9999
//return Ok() //Ok(Loop::Break(()));
100100
}
101+
let mut lock = self.wait_mutex.lock();
101102
self.wait.wait(&mut lock);
102103
}
103104

104105
while !self.deleting.load(AtomicOrdering::SeqCst) {
105106
match self.stealer.steal() {
106-
deque::Steal::Success(work) => {
107-
WorkerContext::do_work(work, self.channel.clone())
108-
}
107+
deque::Steal::Success(work) => WorkerContext::do_work(work, self.channel.clone()),
109108
deque::Steal::Retry => {}
110109
deque::Steal::Empty => break,
111110
}
112111
}
113112

114-
return Loop::Continue(self);
113+
return future::ok((self, true)); // Loop::Continue(self);
115114
}
116115

117-
fn do_work(work: Work<Message>, channel: IoChannel<Message>)
118-
{
116+
fn do_work(work: Work<Message>, channel: IoChannel<Message>) {
119117
match work.work_type {
120118
WorkType::Readable => {
121119
work.handler
@@ -139,15 +137,9 @@ Message: Send + Sync + 'static,
139137
}
140138
}
141139
}
142-
143-
144140
}
145141

146142
impl Worker {
147-
148-
149-
150-
151143
/// Creates a new worker instance.
152144
pub fn new<Message>(
153145
name: &str,
@@ -167,61 +159,34 @@ impl Worker {
167159
wait_mutex: wait_mutex.clone(),
168160
};
169161

170-
171-
172-
173-
let mut context = WorkerContext::new(stealer, channel, wait, wait_mutex, deleting);
174-
175-
176-
let f = |c: WorkerContext<Message>| {
177-
c.execute()
178-
179-
};
180-
181-
let l = future::loop_fn(context, f);
162+
let context = WorkerContext::new(stealer, channel, wait, wait_mutex, deleting);
182163

183164
worker.thread = Some(
184165
thread::Builder::new()
185166
.stack_size(STACK_SIZE)
186167
.name(format!("Worker {}", name))
187168
.spawn(move || {
188169
LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
189-
let ini = (stealer, channel.clone(), wait, wait_mutex.clone(), deleting);
190-
let future = future::loop_fn(ini, |ini| {
191-
return Ok(Loop::Continue((stealer, channel, wait, wait_mutex, deleting)));
192-
});
193-
// future::loop_fn(ini, |(stealer, channel, wait, wait_mutex, deleting)| {
194-
// {
195-
// let mut lock = wait_mutex.lock();
196-
// if deleting.load(AtomicOrdering::SeqCst) {
197-
// return Ok(Loop::Break((stealer, channel, wait, wait_mutex, deleting)));
198-
// //return Ok() //Ok(Loop::Break(()));
199-
// }
200-
// wait.wait(&mut lock);
201-
// }
202-
203-
// while !deleting.load(AtomicOrdering::SeqCst) {
204-
// match stealer.steal() {
205-
// deque::Steal::Success(work) => {
206-
// Worker::do_work(work, channel.clone())
207-
// }
208-
// deque::Steal::Retry => {}
209-
// deque::Steal::Empty => break,
210-
// }
211-
// }
212-
// Ok(Loop::Continue((
213-
// stealer, channel, wait, wait_mutex, deleting,
214-
// )))
215-
// });
170+
171+
let f = |c: WorkerContext<Message>| {
172+
c.execute().and_then(|r| {
173+
let (context, continue_loop) = r;
174+
if continue_loop {
175+
Ok(Loop::Continue(context))
176+
} else {
177+
Ok(Loop::Break(()))
178+
}
179+
})
180+
};
181+
182+
future::loop_fn(context, f);
216183
})
217184
.expect("Error creating worker thread"),
218185
);
219186
worker
220187
}
221-
222188
}
223189

224-
225190
impl Drop for Worker {
226191
fn drop(&mut self) {
227192
trace!(target: "shutdown", "[IoWorker] Closing...");

0 commit comments

Comments
 (0)