Skip to content

Commit 13b7397

Browse files
authored
Pre-allocate vectors (#370)
* Pre-allocate vectors * Update partition.rs * Update process.rs * Update partition.rs
1 parent c436a53 commit 13b7397

File tree

10 files changed

+97
-101
lines changed

10 files changed

+97
-101
lines changed

communication/src/allocator/generic.rs

+28-28
Original file line numberDiff line numberDiff line change
@@ -32,54 +32,54 @@ impl Generic {
3232
/// The index of the worker out of `(0..self.peers())`.
3333
pub fn index(&self) -> usize {
3434
match self {
35-
&Generic::Thread(ref t) => t.index(),
36-
&Generic::Process(ref p) => p.index(),
37-
&Generic::ProcessBinary(ref pb) => pb.index(),
38-
&Generic::ZeroCopy(ref z) => z.index(),
35+
Generic::Thread(t) => t.index(),
36+
Generic::Process(p) => p.index(),
37+
Generic::ProcessBinary(pb) => pb.index(),
38+
Generic::ZeroCopy(z) => z.index(),
3939
}
4040
}
4141
/// The number of workers.
4242
pub fn peers(&self) -> usize {
4343
match self {
44-
&Generic::Thread(ref t) => t.peers(),
45-
&Generic::Process(ref p) => p.peers(),
46-
&Generic::ProcessBinary(ref pb) => pb.peers(),
47-
&Generic::ZeroCopy(ref z) => z.peers(),
44+
Generic::Thread(t) => t.peers(),
45+
Generic::Process(p) => p.peers(),
46+
Generic::ProcessBinary(pb) => pb.peers(),
47+
Generic::ZeroCopy(z) => z.peers(),
4848
}
4949
}
5050
/// Constructs several send endpoints and one receive endpoint.
5151
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
5252
match self {
53-
&mut Generic::Thread(ref mut t) => t.allocate(identifier),
54-
&mut Generic::Process(ref mut p) => p.allocate(identifier),
55-
&mut Generic::ProcessBinary(ref mut pb) => pb.allocate(identifier),
56-
&mut Generic::ZeroCopy(ref mut z) => z.allocate(identifier),
53+
Generic::Thread(t) => t.allocate(identifier),
54+
Generic::Process(p) => p.allocate(identifier),
55+
Generic::ProcessBinary(pb) => pb.allocate(identifier),
56+
Generic::ZeroCopy(z) => z.allocate(identifier),
5757
}
5858
}
5959
/// Perform work before scheduling operators.
6060
fn receive(&mut self) {
6161
match self {
62-
&mut Generic::Thread(ref mut t) => t.receive(),
63-
&mut Generic::Process(ref mut p) => p.receive(),
64-
&mut Generic::ProcessBinary(ref mut pb) => pb.receive(),
65-
&mut Generic::ZeroCopy(ref mut z) => z.receive(),
62+
Generic::Thread(t) => t.receive(),
63+
Generic::Process(p) => p.receive(),
64+
Generic::ProcessBinary(pb) => pb.receive(),
65+
Generic::ZeroCopy(z) => z.receive(),
6666
}
6767
}
6868
/// Perform work after scheduling operators.
6969
pub fn release(&mut self) {
7070
match self {
71-
&mut Generic::Thread(ref mut t) => t.release(),
72-
&mut Generic::Process(ref mut p) => p.release(),
73-
&mut Generic::ProcessBinary(ref mut pb) => pb.release(),
74-
&mut Generic::ZeroCopy(ref mut z) => z.release(),
71+
Generic::Thread(t) => t.release(),
72+
Generic::Process(p) => p.release(),
73+
Generic::ProcessBinary(pb) => pb.release(),
74+
Generic::ZeroCopy(z) => z.release(),
7575
}
7676
}
7777
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
7878
match self {
79-
&Generic::Thread(ref t) => t.events(),
80-
&Generic::Process(ref p) => p.events(),
81-
&Generic::ProcessBinary(ref pb) => pb.events(),
82-
&Generic::ZeroCopy(ref z) => z.events(),
79+
Generic::Thread(ref t) => t.events(),
80+
Generic::Process(ref p) => p.events(),
81+
Generic::ProcessBinary(ref pb) => pb.events(),
82+
Generic::ZeroCopy(ref z) => z.events(),
8383
}
8484
}
8585
}
@@ -96,10 +96,10 @@ impl Allocate for Generic {
9696
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
9797
fn await_events(&self, _duration: Option<std::time::Duration>) {
9898
match self {
99-
&Generic::Thread(ref t) => t.await_events(_duration),
100-
&Generic::Process(ref p) => p.await_events(_duration),
101-
&Generic::ProcessBinary(ref pb) => pb.await_events(_duration),
102-
&Generic::ZeroCopy(ref z) => z.await_events(_duration),
99+
Generic::Thread(t) => t.await_events(_duration),
100+
Generic::Process(p) => p.await_events(_duration),
101+
Generic::ProcessBinary(pb) => pb.await_events(_duration),
102+
Generic::ZeroCopy(z) => z.await_events(_duration),
103103
}
104104
}
105105
}

communication/src/allocator/process.rs

+13-13
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl AllocateBuilder for ProcessBuilder {
3838
let buzzer = Buzzer::new();
3939
worker.send(buzzer).expect("Failed to send buzzer");
4040
}
41-
let mut buzzers = Vec::new();
41+
let mut buzzers = Vec::with_capacity(self.buzzers_recv.len());
4242
for worker in self.buzzers_recv.iter() {
4343
buzzers.push(worker.recv().expect("Failed to recv buzzer"));
4444
}
@@ -69,19 +69,19 @@ pub struct Process {
6969

7070
impl Process {
7171
/// Access the wrapped inner allocator.
72-
pub fn inner<'a>(&'a mut self) -> &'a mut Thread { &mut self.inner }
72+
pub fn inner(&mut self) -> &mut Thread { &mut self.inner }
7373
/// Allocate a list of connected intra-process allocators.
7474
pub fn new_vector(peers: usize) -> Vec<ProcessBuilder> {
7575

76-
let mut counters_send = Vec::new();
77-
let mut counters_recv = Vec::new();
76+
let mut counters_send = Vec::with_capacity(peers);
77+
let mut counters_recv = Vec::with_capacity(peers);
7878
for _ in 0 .. peers {
7979
let (send, recv) = crossbeam_channel::unbounded();
8080
counters_send.push(send);
8181
counters_recv.push(recv);
8282
}
8383

84-
let channels = Arc::new(Mutex::new(HashMap::new()));
84+
let channels = Arc::new(Mutex::new(HashMap::with_capacity(peers)));
8585

8686
// Allocate matrix of buzzer send and recv endpoints.
8787
let (buzzers_send, buzzers_recv) = crate::promise_futures(peers, peers);
@@ -116,23 +116,23 @@ impl Allocate for Process {
116116
// first worker that enters this critical section
117117

118118
// ensure exclusive access to shared list of channels
119-
let mut channels = self.channels.lock().ok().expect("mutex error?");
119+
let mut channels = self.channels.lock().expect("mutex error?");
120120

121121
let (sends, recv, empty) = {
122122

123123
// we may need to alloc a new channel ...
124124
let entry = channels.entry(identifier).or_insert_with(|| {
125125

126-
let mut pushers = Vec::new();
127-
let mut pullers = Vec::new();
128-
for index in 0 .. self.peers {
126+
let mut pushers = Vec::with_capacity(self.peers);
127+
let mut pullers = Vec::with_capacity(self.peers);
128+
for buzzer in self.buzzers.iter() {
129129
let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = crossbeam_channel::unbounded();
130130
// TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter.
131-
pushers.push((Pusher { target: s }, self.buzzers[index].clone()));
131+
pushers.push((Pusher { target: s }, buzzer.clone()));
132132
pullers.push(Puller { source: r, current: None });
133133
}
134134

135-
let mut to_box = Vec::new();
135+
let mut to_box = Vec::with_capacity(pullers.len());
136136
for recv in pullers.into_iter() {
137137
to_box.push(Some((pushers.clone(), recv)));
138138
}
@@ -164,8 +164,8 @@ impl Allocate for Process {
164164

165165
let sends =
166166
sends.into_iter()
167-
.enumerate()
168-
.map(|(i,(s,b))| CountPusher::new(s, identifier, self.counters_send[i].clone(), b))
167+
.zip(self.counters_send.iter())
168+
.map(|((s,b), sender)| CountPusher::new(s, identifier, sender.clone(), b))
169169
.map(|s| Box::new(s) as Box<dyn Push<super::Message<T>>>)
170170
.collect::<Vec<_>>();
171171

communication/src/allocator/thread.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl Thread {
6868
let pusher = Pusher { target: shared.clone() };
6969
let pusher = CountPusher::new(pusher, identifier, events.clone());
7070
let puller = Puller { source: shared, current: None };
71-
let puller = CountPuller::new(puller, identifier, events.clone());
71+
let puller = CountPuller::new(puller, identifier, events);
7272
(pusher, puller)
7373
}
7474
}

communication/src/allocator/zero_copy/allocator_process.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl Allocate for ProcessAllocator {
127127
}
128128
self.channel_id_bound = Some(identifier);
129129

130-
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();
130+
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::with_capacity(self.peers());
131131

132132
for target_index in 0 .. self.peers() {
133133

communication/src/allocator/zero_copy/initialize.rs

+43-49
Original file line numberDiff line numberDiff line change
@@ -75,58 +75,52 @@ pub fn initialize_networking_from_sockets(
7575
let mut promises_iter = promises.into_iter();
7676
let mut futures_iter = futures.into_iter();
7777

78-
let mut send_guards = Vec::new();
79-
let mut recv_guards = Vec::new();
78+
let mut send_guards = Vec::with_capacity(sockets.len());
79+
let mut recv_guards = Vec::with_capacity(sockets.len());
8080

8181
// for each process, if a stream exists (i.e. not local) ...
82-
for index in 0..sockets.len() {
83-
84-
if let Some(stream) = sockets[index].take() {
85-
// remote process
86-
87-
let remote_recv = promises_iter.next().unwrap();
88-
89-
{
90-
let log_sender = log_sender.clone();
91-
let stream = stream.try_clone()?;
92-
let join_guard =
93-
::std::thread::Builder::new()
94-
.name(format!("timely:send-{}", index))
95-
.spawn(move || {
96-
97-
let logger = log_sender(CommunicationSetup {
98-
process: my_index,
99-
sender: true,
100-
remote: Some(index),
101-
});
102-
103-
send_loop(stream, remote_recv, my_index, index, logger);
104-
})?;
105-
106-
send_guards.push(join_guard);
107-
}
108-
109-
let remote_send = futures_iter.next().unwrap();
110-
111-
{
112-
// let remote_sends = remote_sends.clone();
113-
let log_sender = log_sender.clone();
114-
let stream = stream.try_clone()?;
115-
let join_guard =
116-
::std::thread::Builder::new()
117-
.name(format!("timely:recv-{}", index))
118-
.spawn(move || {
119-
let logger = log_sender(CommunicationSetup {
120-
process: my_index,
121-
sender: false,
122-
remote: Some(index),
123-
});
124-
recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
125-
})?;
126-
127-
recv_guards.push(join_guard);
128-
}
82+
for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
83+
let remote_recv = promises_iter.next().unwrap();
84+
85+
{
86+
let log_sender = log_sender.clone();
87+
let stream = stream.try_clone()?;
88+
let join_guard =
89+
::std::thread::Builder::new()
90+
.name(format!("timely:send-{}", index))
91+
.spawn(move || {
92+
93+
let logger = log_sender(CommunicationSetup {
94+
process: my_index,
95+
sender: true,
96+
remote: Some(index),
97+
});
98+
99+
send_loop(stream, remote_recv, my_index, index, logger);
100+
})?;
101+
102+
send_guards.push(join_guard);
103+
}
129104

105+
let remote_send = futures_iter.next().unwrap();
106+
107+
{
108+
// let remote_sends = remote_sends.clone();
109+
let log_sender = log_sender.clone();
110+
let stream = stream.try_clone()?;
111+
let join_guard =
112+
::std::thread::Builder::new()
113+
.name(format!("timely:recv-{}", index))
114+
.spawn(move || {
115+
let logger = log_sender(CommunicationSetup {
116+
process: my_index,
117+
sender: false,
118+
remote: Some(index),
119+
});
120+
recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
121+
})?;
122+
123+
recv_guards.push(join_guard);
130124
}
131125
}
132126

communication/src/allocator/zero_copy/push_pull.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ impl<T, P: BytesPush> Pusher<T, P> {
2828
/// Creates a new `Pusher` from a header and shared byte buffer.
2929
pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> {
3030
Pusher {
31-
header: header,
32-
sender: sender,
31+
header,
32+
sender,
3333
phantom: ::std::marker::PhantomData,
3434
}
3535
}

timely/src/dataflow/operators/generic/builder_rc.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl<G: Scope> OperatorBuilder<G> {
136136
L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
137137
{
138138
// create capabilities, discard references to their creation.
139-
let mut capabilities = Vec::new();
139+
let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
140140
for output_index in 0 .. self.internal.borrow().len() {
141141
let borrow = &self.internal.borrow()[output_index];
142142
capabilities.push(mint_capability(G::Timestamp::minimum(), borrow.clone()));

timely/src/dataflow/operators/partition.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::dataflow::{Scope, Stream};
66
use crate::Data;
77

88
/// Partition a stream of records into multiple streams.
9-
pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)> {
9+
pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {
1010
/// Produces `parts` output streams, containing records produced and assigned by `route`.
1111
///
1212
/// # Examples
@@ -27,12 +27,11 @@ pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)> {
2727

2828
impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
2929
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {
30-
3130
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
3231

3332
let mut input = builder.new_input(self, Pipeline);
34-
let mut outputs = Vec::new();
35-
let mut streams = Vec::new();
33+
let mut outputs = Vec::with_capacity(parts as usize);
34+
let mut streams = Vec::with_capacity(parts as usize);
3635

3736
for _ in 0 .. parts {
3837
let (output, stream) = builder.new_output();
@@ -47,6 +46,7 @@ impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D
4746
input.for_each(|time, data| {
4847
data.swap(&mut vector);
4948
let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>();
49+
5050
for datum in vector.drain(..) {
5151
let (part, datum2) = route(datum);
5252
sessions[part as usize].give(datum2);
@@ -57,4 +57,4 @@ impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D
5757

5858
streams
5959
}
60-
}
60+
}

timely/src/progress/reachability.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ impl<T: Timestamp> Builder<T> {
298298
// Initially this list contains observed locations with no incoming
299299
// edges, but as the algorithm develops we add to it any locations
300300
// that can only be reached by nodes that have been on this list.
301-
let mut worklist = Vec::new();
301+
let mut worklist = Vec::with_capacity(in_degree.len());
302302
for (key, val) in in_degree.iter() {
303303
if *val == 0 {
304304
worklist.push(*key);

timely/src/synchronization/sequence.rs

+2
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ impl<T: ExchangeData> Sequencer<T> {
179179
// grab each command and queue it up
180180
input.for_each(|time, data| {
181181
data.swap(&mut vector);
182+
183+
recvd.reserve(vector.len());
182184
for (worker, counter, element) in vector.drain(..) {
183185
recvd.push(((time.time().clone(), worker, counter), element));
184186
}

0 commit comments

Comments
 (0)