Skip to content

Commit 2b1ad21

Browse files
committed
clippy edits
1 parent 0ee326e commit 2b1ad21

21 files changed

+50
-92
lines changed

benches/barrier.rs

-43
This file was deleted.

examples/distinct.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ fn main() {
1414
.to_stream(scope)
1515
.unary_stream(Pipeline, "Distinct", move |input, output| {
1616
input.for_each(|time, data| {
17-
let mut counts = counts_by_time.entry(time.time().clone())
18-
.or_insert(HashMap::new());
17+
let counts = counts_by_time.entry(time.time().clone())
18+
.or_insert(HashMap::new());
1919
let mut session = output.session(&time);
2020
for &datum in data.iter() {
21-
let mut count = counts.entry(datum)
22-
.or_insert(0);
21+
let count = counts.entry(datum)
22+
.or_insert(0);
2323
if *count == 0 {
2424
session.give(datum);
2525
}

examples/pagerank.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,20 @@ fn main() {
5050

5151
// hold on to edge changes until it is time.
5252
input1.for_each(|time, data| {
53-
edge_stash.entry(time).or_insert(vec![]).extend(data.drain(..));
53+
edge_stash.entry(time).or_insert(Vec::new()).extend(data.drain(..));
5454
});
5555

5656
// hold on to rank changes until it is time.
5757
input2.for_each(|time, data| {
58-
rank_stash.entry(time).or_insert(vec![]).extend(data.drain(..));
58+
rank_stash.entry(time).or_insert(Vec::new()).extend(data.drain(..));
5959
});
6060

6161
let frontiers = &[input1.frontier(), input2.frontier()];
6262

6363
for (time, edge_changes) in edge_stash.iter_mut() {
6464
if frontiers.iter().all(|f| !f.less_equal(time)) {
6565

66-
let mut session = output.session(&time);
66+
let mut session = output.session(time);
6767

6868
compact(edge_changes);
6969

@@ -93,12 +93,12 @@ fn main() {
9393
}
9494
}
9595

96-
edge_stash.retain(|_key, val| val.len() > 0);
96+
edge_stash.retain(|_key, val| !val.is_empty());
9797

9898
for (time, rank_changes) in rank_stash.iter_mut() {
9999
if frontiers.iter().all(|f| !f.less_equal(time)) {
100100

101-
let mut session = output.session(&time);
101+
let mut session = output.session(time);
102102

103103
compact(rank_changes);
104104

@@ -137,7 +137,7 @@ fn main() {
137137
}
138138
}
139139

140-
rank_stash.retain(|_key, val| val.len() > 0);
140+
rank_stash.retain(|_key, val| !val.is_empty());
141141

142142
}
143143
}
@@ -179,7 +179,7 @@ fn main() {
179179
}
180180

181181
fn compact<T: Ord>(list: &mut Vec<(T, i64)>) {
182-
if list.len() > 0 {
182+
if !list.is_empty() {
183183
list.sort_by(|x,y| x.0.cmp(&y.0));
184184
for i in 0 .. list.len() - 1 {
185185
if list[i].0 == list[i+1].0 {
@@ -193,7 +193,7 @@ fn compact<T: Ord>(list: &mut Vec<(T, i64)>) {
193193

194194
// this method allocates some rank between elements of `edges`.
195195
fn allocate(rank: i64, edges: &[(usize, i64)], send: &mut Vec<(usize, i64)>) {
196-
if edges.len() > 0 {
196+
if !edges.is_empty() {
197197
assert!(rank >= 0);
198198
assert!(edges.iter().all(|x| x.1 > 0));
199199

examples/wordcount.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ fn main() {
4949
}
5050
}
5151

52-
queues.retain(|_key, val| val.len() > 0);
52+
queues.retain(|_key, val| !val.is_empty());
5353
}})
5454
.inspect(|x| println!("seen: {:?}", x))
5555
.probe_with(&mut probe);

src/dataflow/channels/message.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl<D> Content<D> {
126126
}
127127

128128
/// Pushes `buffer` into `pusher`, ensuring that `buffer` remains valid once returned.
129-
#[inline]
129+
#[inline(always)]
130130
pub fn push_at<T, P: Push<(T, Content<D>)>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {
131131

132132
let data = Content::from_typed(buffer);
@@ -139,13 +139,15 @@ impl<D> Content<D> {
139139
*buffer = typed;
140140
}
141141
else {
142+
// println!("re-allocating (nothing returned)");
142143
*buffer = Vec::with_capacity(Content::<D>::default_length());
143144
}
144145

145146
// TODO : Assert failing, but not sure if is bug when deser can make arbitrary lengths
146147
// TODO : in clone. Revisit!
147148
// assert!(buffer.capacity() == Content::<D>::default_length());
148149
if buffer.capacity() != Content::<D>::default_length() {
150+
// println!("re-allocating (wrong size)");
149151
*buffer = Vec::with_capacity(Content::<D>::default_length());
150152
}
151153
}

src/dataflow/channels/pact.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use dataflow::channels::{Message, Content};
1919

2020
use abomonation::Abomonation;
2121

22-
/// A ParallelizationContract allocates paired `Push` and `Pull` implementors.
22+
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
2323
pub trait ParallelizationContract<T: 'static, D: 'static> {
2424
/// Type implementing `Push` produced by this pact.
2525
type Pusher: Push<(T, Content<D>)>+'static;
@@ -36,7 +36,7 @@ impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
3636
type Pusher = Pusher<T, D, ThreadPusher<Message<T, D>>>;
3737
type Puller = Puller<T, D, ThreadPuller<Message<T, D>>>;
3838
fn connect<A: Allocate>(self, allocator: &mut A, identifier: usize) -> (Self::Pusher, Self::Puller) {
39-
// ignore &mut A and use thread allocator
39+
// ignore `&mut A` and use thread allocator
4040
let (pusher, puller) = Thread::new::<Message<T, D>>();
4141

4242
(Pusher::new(pusher, allocator.index(), allocator.index(), identifier),
@@ -56,9 +56,7 @@ impl<D, F: Fn(&D)->u64> Exchange<D, F> {
5656
}
5757
}
5858

59-
// Exchange uses a Box<Pushable> because it cannot know what type of pushable will return from the allocator.
60-
// The PactObserver will do some buffering for Exchange, cutting down on the virtual calls, but we still
61-
// would like to get the vectors it sends back, so that they can be re-used if possible.
59+
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
6260
impl<T: Eq+Data+Abomonation, D: Data+Abomonation, F: Fn(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
6361
// TODO: The closure in the type prevents us from naming it.
6462
// Could specialize `ExchangePusher` to a time-free version.
@@ -170,7 +168,7 @@ impl<T, D, P: Pull<Message<T, D>>> Pull<(T, Content<D>)> for Puller<T, D, P> {
170168

171169
::std::mem::swap(&mut previous, self.puller.pull());
172170

173-
if let Some(ref message) = previous.as_ref() {
171+
if let Some(message) = previous.as_ref() {
174172

175173
::logging::log(&::logging::MESSAGES, ::logging::MessagesEvent {
176174
is_send: false,

src/dataflow/channels/pushers/counter.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct Counter<T: Ord, D, P: Push<(T, Content<D>)>> {
1515
}
1616

1717
impl<T: Ord, D, P: Push<(T, Content<D>)>> Push<(T, Content<D>)> for Counter<T, D, P> where T : Eq+Clone+'static {
18-
#[inline]
18+
#[inline(always)]
1919
fn push(&mut self, message: &mut Option<(T, Content<D>)>) {
2020
if let Some((ref time, ref data)) = *message {
2121
self.produced.borrow_mut().update(time.clone(), data.len() as i64);
@@ -38,6 +38,7 @@ impl<T, D, P: Push<(T, Content<D>)>> Counter<T, D, P> where T : Ord+Clone+'stati
3838
}
3939
}
4040
/// A references to shared changes in counts, for cloning or draining.
41+
#[inline(always)]
4142
pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
4243
&self.produced
4344
}

src/dataflow/channels/pushers/exchange.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl<T: Eq+Clone+'static, D: Data+Abomonation, P: Push<(T, Content<D>)>, H: Fn(&
5959
if (self.pushers.len() & (self.pushers.len() - 1)) == 0 {
6060
let mask = (self.pushers.len() - 1) as u64;
6161
for datum in data.drain(..) {
62-
let index = (((self.hash_func)(&time, &datum)) & mask) as usize;
62+
let index = (((self.hash_func)(time, &datum)) & mask) as usize;
6363

6464
self.buffers[index].push(datum);
6565
if self.buffers[index].len() == self.buffers[index].capacity() {
@@ -78,7 +78,7 @@ impl<T: Eq+Clone+'static, D: Data+Abomonation, P: Push<(T, Content<D>)>, H: Fn(&
7878
// as a last resort, use mod (%)
7979
else {
8080
for datum in data.drain(..) {
81-
let index = (((self.hash_func)(&time, &datum)) % self.pushers.len() as u64) as usize;
81+
let index = (((self.hash_func)(time, &datum)) % self.pushers.len() as u64) as usize;
8282
self.buffers[index].push(datum);
8383
if self.buffers[index].len() == self.buffers[index].capacity() {
8484
self.flush(index);

src/dataflow/operators/aggregation/aggregate.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for
9595

9696
// pop completed aggregates, send along whatever
9797
notificator.for_each(|time,_,_| {
98-
if let Some(aggs) = aggregates.remove(&time.time()) {
98+
if let Some(aggs) = aggregates.remove(time.time()) {
9999
let mut session = output.session(&time);
100100
for (key, agg) in aggs {
101101
session.give(emit(key, agg));

src/dataflow/operators/aggregation/state_machine.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> f
7272
// stash each input and request a notification when ready
7373
input.for_each(|time, data| {
7474
// stash if not time yet
75-
if notificator.frontier(0).iter().any(|x| x.less_than(&time.time())) {
75+
if notificator.frontier(0).iter().any(|x| x.less_than(time.time())) {
7676
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
7777
notificator.notify_at(time);
7878
}

src/dataflow/operators/broadcast.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D> {
4040

4141
let channel_id = scope.new_identifier();
4242

43-
assert!(pushers.len() == scope.peers());
43+
assert_eq!(pushers.len(), scope.peers());
4444

4545
let receiver = Puller::new(puller, scope.index(), channel_id);
4646

src/dataflow/operators/count.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl<G: Scope, D: Data> Accumulate<G, D> for Stream<G, D> {
5858
let mut accums = HashMap::new();
5959
self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| {
6060
input.for_each(|time, data| {
61-
logic(&mut accums.entry(time.time().clone()).or_insert(default.clone()), data);
61+
logic(&mut accums.entry(time.time().clone()).or_insert_with(|| default.clone()), data);
6262
notificator.notify_at(time);
6363
});
6464

src/dataflow/operators/generic/handles.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ impl<'a, T: Timestamp, D, P: Pull<(T, Content<D>)>> InputHandle<T, D, P> {
3636
/// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
3737
/// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
3838
/// Returns `None` when there's no more data available.
39-
#[inline]
39+
#[inline(always)]
4040
pub fn next(&mut self) -> Option<(Capability<T>, &mut Content<D>)> {
4141
let internal = &mut self.internal;
4242
self.pull_counter.next().map(|(time, content)| {
@@ -77,7 +77,7 @@ impl<'a, T: Timestamp, D, P: Pull<(T, Content<D>)>+'a> FrontieredInputHandle<'a,
7777
/// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
7878
/// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
7979
/// Returns `None` when there's no more data available.
80-
#[inline]
80+
#[inline(always)]
8181
pub fn next(&mut self) -> Option<(Capability<T>, &mut Content<D>)> {
8282
self.handle.next()
8383
}
@@ -106,7 +106,7 @@ impl<'a, T: Timestamp, D, P: Pull<(T, Content<D>)>+'a> FrontieredInputHandle<'a,
106106
}
107107

108108
/// Inspect the frontier associated with this input.
109-
#[inline]
109+
#[inline(always)]
110110
pub fn frontier(&self) -> &'a MutableAntichain<T> {
111111
self.frontier
112112
}

src/dataflow/operators/generic/notificator.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use dataflow::operators::Capability;
66

77
/// Tracks requests for notification and delivers available notifications.
88
///
9-
/// Notificator is meant to manage the delivery of requested notifications in the presence of
9+
/// `Notificator` is meant to manage the delivery of requested notifications in the presence of
1010
/// inputs that may have outstanding messages to deliver. The notificator tracks the frontiers,
1111
/// as presented from the outside, for each input. Requested notifications can be served only
1212
/// once there are no frontier elements less-or-equal to them, and there are no other pending
@@ -240,7 +240,7 @@ fn notificator_delivers_notifications_in_topo_order() {
240240

241241
/// Tracks requests for notification and delivers available notifications.
242242
///
243-
/// FrontierNotificator is meant to manage the delivery of requested notifications in the
243+
/// `FrontierNotificator` is meant to manage the delivery of requested notifications in the
244244
/// presence of inputs that may have outstanding messages to deliver.
245245
/// The notificator inspects the frontiers, as presented from the outside, for each input.
246246
/// Requested notifications can be served only once there are no frontier elements less-or-equal
@@ -359,7 +359,7 @@ impl<T: Timestamp> FrontierNotificator<T> {
359359
// It should be safe to append any ordered subset of self.pending to self.available,
360360
// in that the sequence of capabilities in self.available will remain non-decreasing.
361361

362-
if self.pending.len() > 0 {
362+
if !self.pending.is_empty() {
363363

364364
self.pending.sort_by(|x,y| x.0.time().cmp(y.0.time()));
365365
for i in 0 .. self.pending.len() - 1 {

src/dataflow/operators/partition.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ impl<T:Timestamp, D: Data, D2: Data, F: Fn(D)->(u64, D2)> Operate<T> for Operato
9292
let outputs = self.outputs.iter_mut();
9393

9494
// TODO : This results in small sends for many parts, as sessions does the buffering
95-
let mut sessions: Vec<_> = outputs.map(|x| x.session(&time)).collect();
95+
let mut sessions: Vec<_> = outputs.map(|x| x.session(time)).collect();
9696

9797
for (part, datum) in data.drain(..).map(&self.route) {
9898
sessions[part as usize].give(datum);

src/dataflow/operators/reclock.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ impl<S: Scope, D: Data> Reclock<S, D> for Stream<S, D> {
7070
notificator.for_each(|cap,_,_| {
7171
let mut session = output.session(&cap);
7272
for &mut (ref t, ref mut data) in &mut stash {
73-
if t.less_equal(&cap.time()) {
73+
if t.less_equal(cap.time()) {
7474
session.give_content(data);
7575
}
7676
}
77-
stash.retain(|x| !x.0.less_equal(&cap.time()));
77+
stash.retain(|x| !x.0.less_equal(cap.time()));
7878
});
7979
})
8080
}

src/dataflow/operators/unordered_input.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl<G: Scope> UnorderedInput<G> for G {
9696
peers: peers,
9797
});
9898

99-
return ((helper, cap), Stream::new(Source { index: index, port: 0 }, registrar, self.clone()));
99+
((helper, cap), Stream::new(Source { index: index, port: 0 }, registrar, self.clone()))
100100
}
101101
}
102102

@@ -130,7 +130,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
130130
{
131131
self.produced.borrow_mut().drain_into(&mut produced[0]);
132132
self.internal.borrow_mut().drain_into(&mut internal[0]);
133-
return false;
133+
false
134134
}
135135

136136
fn notify_me(&self) -> bool { false }

src/progress/change_batch.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl<T:Ord> ChangeBatch<T> {
212212
/// assert!(!batch2.is_empty());
213213
///```
214214
pub fn drain_into(&mut self, other: &mut ChangeBatch<T>) where T: Clone {
215-
if other.updates.len() == 0 {
215+
if other.updates.is_empty() {
216216
::std::mem::swap(self, other);
217217
}
218218
else {

0 commit comments

Comments
 (0)