Skip to content

Commit 5323fd6

Browse files
Simplify communication Message type (#596)
* Remove MessageContents::Arc * Remove MessageContents * Remove RefOrMut::Ref * Remove RefOrMut * Remove accessory methods * mdbook fix-up
1 parent ba17477 commit 5323fd6

39 files changed

+100
-355
lines changed

communication/src/message.rs

+8-150
Original file line numberDiff line numberDiff line change
@@ -1,184 +1,42 @@
11
//! Types wrapping typed data.
22
3-
use std::sync::Arc;
43
use bytes::arc::Bytes;
54
use crate::Data;
65

7-
/// Either an immutable or mutable reference.
8-
pub enum RefOrMut<'a, T> where T: 'a {
9-
/// An immutable reference.
10-
Ref(&'a T),
11-
/// A mutable reference.
12-
Mut(&'a mut T),
13-
}
14-
15-
impl<'a, T: 'a> ::std::ops::Deref for RefOrMut<'a, T> {
16-
type Target = T;
17-
fn deref(&self) -> &Self::Target {
18-
match self {
19-
RefOrMut::Ref(reference) => reference,
20-
RefOrMut::Mut(reference) => reference,
21-
}
22-
}
23-
}
24-
25-
impl<'a, T: 'a> ::std::borrow::Borrow<T> for RefOrMut<'a, T> {
26-
fn borrow(&self) -> &T {
27-
match self {
28-
RefOrMut::Ref(reference) => reference,
29-
RefOrMut::Mut(reference) => reference,
30-
}
31-
}
32-
}
33-
34-
impl<'a, T: Clone+'a> RefOrMut<'a, T> {
35-
/// Extracts the contents of `self`, either by cloning or swapping.
36-
///
37-
/// This consumes `self` because its contents are now in an unknown state.
38-
pub fn swap<'b>(self, element: &'b mut T) {
39-
match self {
40-
RefOrMut::Ref(reference) => element.clone_from(reference),
41-
RefOrMut::Mut(reference) => ::std::mem::swap(reference, element),
42-
};
43-
}
44-
/// Extracts the contents of `self`, either by cloning or swapping.
45-
///
46-
/// This consumes `self` because its contents are now in an unknown state.
47-
pub fn replace(self, mut element: T) -> T {
48-
self.swap(&mut element);
49-
element
50-
}
51-
52-
/// Extracts the contents of `self`, either by cloning, or swapping and leaving a default
53-
/// element in place.
54-
///
55-
/// This consumes `self` because its contents are now in an unknown state.
56-
pub fn take(self) -> T where T: Default {
57-
let mut element = Default::default();
58-
self.swap(&mut element);
59-
element
60-
}
61-
}
62-
63-
/// A wrapped message which may be either typed or binary data.
6+
/// A wrapped message which supports serialization and deserialization.
647
pub struct Message<T> {
65-
payload: MessageContents<T>,
66-
}
67-
68-
/// Possible returned representations from a channel.
69-
enum MessageContents<T> {
70-
/// Rust typed instance. Available for ownership.
71-
Owned(T),
72-
/// Atomic reference counted. Only available as a reference.
73-
Arc(Arc<T>),
8+
/// Message contents.
9+
pub payload: T,
7410
}
7511

7612
impl<T> Message<T> {
7713
/// Wrap a typed item as a message.
7814
pub fn from_typed(typed: T) -> Self {
79-
Message { payload: MessageContents::Owned(typed) }
80-
}
81-
/// Wrap a shared typed item as a message.
82-
pub fn from_arc(typed: Arc<T>) -> Self {
83-
Message { payload: MessageContents::Arc(typed) }
84-
}
85-
/// Destructures and returns any typed data.
86-
pub fn if_typed(self) -> Option<T> {
87-
match self.payload {
88-
MessageContents::Owned(typed) => Some(typed),
89-
MessageContents::Arc(_) => None,
90-
}
91-
}
92-
/// Returns a mutable reference, if typed.
93-
pub fn if_mut(&mut self) -> Option<&mut T> {
94-
match &mut self.payload {
95-
MessageContents::Owned(typed) => Some(typed),
96-
MessageContents::Arc(_) => None,
97-
}
98-
}
99-
/// Returns an immutable or mutable typed reference.
100-
///
101-
/// This method returns a mutable reference if the underlying data are typed Rust
102-
/// instances, which admit mutation, and it returns an immutable reference if the
103-
/// data are serialized binary data.
104-
pub fn as_ref_or_mut(&mut self) -> RefOrMut<T> {
105-
match &mut self.payload {
106-
MessageContents::Owned(typed) => { RefOrMut::Mut(typed) },
107-
MessageContents::Arc(typed) => { RefOrMut::Ref(typed) },
108-
}
15+
Message { payload: typed }
10916
}
11017
}
11118

11219
impl<T: Data> Message<T> {
11320
/// Wrap bytes as a message.
11421
pub fn from_bytes(bytes: Bytes) -> Self {
11522
let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed");
116-
Message { payload: MessageContents::Owned(typed) }
23+
Message { payload: typed }
11724
}
11825

11926
/// The number of bytes required to serialize the data.
12027
pub fn length_in_bytes(&self) -> usize {
121-
match &self.payload {
122-
MessageContents::Owned(typed) => {
123-
::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize
124-
},
125-
MessageContents::Arc(typed) => {
126-
::bincode::serialized_size(&**typed).expect("bincode::serialized_size() failed") as usize
127-
},
128-
}
28+
::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize
12929
}
13030

13131
/// Writes the binary representation into `writer`.
13232
pub fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
133-
match &self.payload {
134-
MessageContents::Owned(typed) => {
135-
::bincode::serialize_into(writer, &typed).expect("bincode::serialize_into() failed");
136-
},
137-
MessageContents::Arc(typed) => {
138-
::bincode::serialize_into(writer, &**typed).expect("bincode::serialize_into() failed");
139-
},
140-
}
33+
::bincode::serialize_into(writer, &self.payload).expect("bincode::serialize_into() failed");
14134
}
14235
}
14336

14437
impl<T> ::std::ops::Deref for Message<T> {
14538
type Target = T;
14639
fn deref(&self) -> &Self::Target {
147-
// TODO: In principle we have already decoded, but let's go again
148-
match &self.payload {
149-
MessageContents::Owned(typed) => { typed },
150-
MessageContents::Arc(typed) => { typed },
151-
}
40+
&self.payload
15241
}
15342
}
154-
155-
impl<T: Clone> Message<T> {
156-
/// Produces a typed instance of the wrapped element.
157-
pub fn into_typed(self) -> T {
158-
match self.payload {
159-
MessageContents::Owned(instance) => instance,
160-
// TODO: Could attempt `Arc::try_unwrap()` here.
161-
MessageContents::Arc(instance) => (*instance).clone(),
162-
}
163-
}
164-
/// Ensures the message is typed data and returns a mutable reference to it.
165-
pub fn as_mut(&mut self) -> &mut T {
166-
167-
let cloned: Option<T> = match &self.payload {
168-
MessageContents::Owned(_) => None,
169-
// TODO: Could attempt `Arc::try_unwrap()` here.
170-
MessageContents::Arc(typed) => Some((**typed).clone()),
171-
};
172-
173-
if let Some(cloned) = cloned {
174-
self.payload = MessageContents::Owned(cloned);
175-
}
176-
177-
if let MessageContents::Owned(typed) = &mut self.payload {
178-
typed
179-
}
180-
else {
181-
unreachable!()
182-
}
183-
}
184-
}

mdbook/src/chapter_2/chapter_2_4.md

+6-10
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@ fn main() {
1919
.to_stream(scope)
2020
.unary(Pipeline, "increment", |capability, info| {
2121

22-
let mut vector = Vec::new();
2322
move |input, output| {
2423
while let Some((time, data)) = input.next() {
25-
data.swap(&mut vector);
2624
let mut session = output.session(&time);
27-
for datum in vector.drain(..) {
25+
for datum in data.drain(..) {
2826
session.give(datum + 1);
2927
}
3028
}
@@ -136,13 +134,11 @@ fn main() {
136134
.unary(Pipeline, "increment", |capability, info| {
137135

138136
let mut maximum = 0; // define this here; use in the closure
139-
let mut vector = Vec::new();
140137

141138
move |input, output| {
142139
while let Some((time, data)) = input.next() {
143-
data.swap(&mut vector);
144140
let mut session = output.session(&time);
145-
for datum in vector.drain(..) {
141+
for datum in data.drain(..) {
146142
if datum > maximum {
147143
session.give(datum + 1);
148144
maximum = datum;
@@ -195,13 +191,13 @@ fn main() {
195191
while let Some((time, data)) = input1.next() {
196192
stash.entry(time.time().clone())
197193
.or_insert(Vec::new())
198-
.push(data.replace(Vec::new()));
194+
.push(std::mem::take(data));
199195
notificator.notify_at(time.retain());
200196
}
201197
while let Some((time, data)) = input2.next() {
202198
stash.entry(time.time().clone())
203199
.or_insert(Vec::new())
204-
.push(data.replace(Vec::new()));
200+
.push(std::mem::take(data));
205201
notificator.notify_at(time.retain());
206202
}
207203

@@ -246,12 +242,12 @@ fn main() {
246242
while let Some((time, data)) = input1.next() {
247243
stash.entry(time.retain())
248244
.or_insert(Vec::new())
249-
.push(data.replace(Vec::new()));
245+
.push(std::mem::take(data));
250246
}
251247
while let Some((time, data)) = input2.next() {
252248
stash.entry(time.retain())
253249
.or_insert(Vec::new())
254-
.push(data.replace(Vec::new()));
250+
.push(std::mem::take(data));
255251
}
256252

257253
// consider sending everything in `stash`.

mdbook/src/chapter_2/chapter_2_5.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ As before, I'm just going to show you the new code, which now lives just after `
212212
while let Some((time, data)) = input.next() {
213213
queues.entry(time.retain())
214214
.or_insert(Vec::new())
215-
.extend(data.replace(Vec::new()));
215+
.extend(std::mem::take(data));
216216
}
217217

218218
// enable each stashed time if ready.
@@ -297,7 +297,7 @@ Inside the closure, we do two things: (i) read inputs and (ii) update counts and
297297
while let Some((time, data)) = input.next() {
298298
queues.entry(time.retain())
299299
.or_insert(Vec::new())
300-
.extend(data.replace(Vec::new()));
300+
.extend(std::mem::take(data));
301301
}
302302
```
303303

mdbook/src/chapter_4/chapter_4_3.md

+1-4
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,13 @@ fn main() {
7878
// Buffer records until all prior timestamps have completed.
7979
.binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {
8080
81-
let mut vector = Vec::new();
82-
8381
move |input1, input2, output| {
8482
8583
// Stash received data.
8684
input1.for_each(|time, data| {
87-
data.swap(&mut vector);
8885
stash.entry(time.retain())
8986
.or_insert(Vec::new())
90-
.extend(vector.drain(..));
87+
.extend(data.drain(..));
9188
});
9289
9390
// Consider sending stashed data.

timely/examples/bfs.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ fn main() {
5858
// receive edges, start to sort them
5959
input1.for_each(|time, data| {
6060
notify.notify_at(time.retain());
61-
edge_list.push(data.replace(Vec::new()));
61+
edge_list.push(std::mem::take(data));
6262
});
6363

6464
// receive (node, worker) pairs, note any new ones.
@@ -68,7 +68,7 @@ fn main() {
6868
notify.notify_at(time.retain());
6969
Vec::new()
7070
})
71-
.push(data.replace(Vec::new()));
71+
.push(std::mem::take(data));
7272
});
7373

7474
notify.for_each(|time, _num, _notify| {

timely/examples/flatcontainer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ fn main() {
5252
queues
5353
.entry(time.retain())
5454
.or_insert(Vec::new())
55-
.push(data.take());
55+
.push(std::mem::take(data));
5656
}
5757

5858
for (key, val) in queues.iter_mut() {

timely/examples/hashjoin.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,12 @@ fn main() {
4040
let mut map1 = HashMap::<u64, Vec<u64>>::new();
4141
let mut map2 = HashMap::<u64, Vec<u64>>::new();
4242

43-
let mut vector1 = Vec::new();
44-
let mut vector2 = Vec::new();
45-
4643
move |input1, input2, output| {
4744

4845
// Drain first input, check second map, update first map.
4946
input1.for_each(|time, data| {
50-
data.swap(&mut vector1);
5147
let mut session = output.session(&time);
52-
for (key, val1) in vector1.drain(..) {
48+
for (key, val1) in data.drain(..) {
5349
if let Some(values) = map2.get(&key) {
5450
for val2 in values.iter() {
5551
session.give((val1.clone(), val2.clone()));
@@ -62,9 +58,8 @@ fn main() {
6258

6359
// Drain second input, check first map, update second map.
6460
input2.for_each(|time, data| {
65-
data.swap(&mut vector2);
6661
let mut session = output.session(&time);
67-
for (key, val2) in vector2.drain(..) {
62+
for (key, val2) in data.drain(..) {
6863
if let Some(values) = map1.get(&key) {
6964
for val1 in values.iter() {
7065
session.give((val1.clone(), val2.clone()));

timely/examples/pagerank.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,18 @@ fn main() {
4242
let mut diffs = Vec::new(); // for received but un-acted upon deltas.
4343
let mut delta = Vec::new();
4444

45-
let mut edge_vec = Vec::new();
46-
let mut rank_vec = Vec::new();
47-
4845
let timer = ::std::time::Instant::now();
4946

5047
move |input1, input2, output| {
5148

5249
// hold on to edge changes until it is time.
5350
input1.for_each(|time, data| {
54-
data.swap(&mut edge_vec);
55-
edge_stash.entry(time.retain()).or_insert(Vec::new()).extend(edge_vec.drain(..));
51+
edge_stash.entry(time.retain()).or_insert(Vec::new()).extend(data.drain(..));
5652
});
5753

5854
// hold on to rank changes until it is time.
5955
input2.for_each(|time, data| {
60-
data.swap(&mut rank_vec);
61-
rank_stash.entry(time.retain()).or_insert(Vec::new()).extend(rank_vec.drain(..));
56+
rank_stash.entry(time.retain()).or_insert(Vec::new()).extend(data.drain(..));
6257
});
6358

6459
let frontiers = &[input1.frontier(), input2.frontier()];

timely/examples/wordcount.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fn main() {
3535
while let Some((time, data)) = input.next() {
3636
queues.entry(time.retain())
3737
.or_insert(Vec::new())
38-
.push(data.replace(Vec::new()));
38+
.push(std::mem::take(data));
3939
}
4040

4141
for (key, val) in queues.iter_mut() {

timely/src/dataflow/channels/mod.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,8 @@ impl<T, C: Container> Message<T, C> {
5353
pusher.push(&mut bundle);
5454

5555
if let Some(message) = bundle {
56-
if let Some(message) = message.if_typed() {
57-
*buffer = message.data;
58-
buffer.clear();
59-
}
56+
*buffer = message.payload.data;
57+
buffer.clear();
6058
}
6159
}
6260
}

0 commit comments

Comments
 (0)