diff --git a/.cargo/config b/.cargo/config new file mode 100644 index 00000000..09f02a19 --- /dev/null +++ b/.cargo/config @@ -0,0 +1,2 @@ +[build] +rustflags = ["-Ctarget-cpu=sandybridge", "-Ctarget-feature=+aes,+sse2,+sse3,+sse4.1,+ssse3"] diff --git a/Cargo.toml b/Cargo.toml index 8e72638b..4977a900 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,11 +17,11 @@ async-trait = "0.1.30" crossbeam = "0.7.3" dashmap = "3.11.1" envy = "^0.4.1" -fasthash = "0.4.0" futures = { version = "0.3.4" } hyper = "0.13.4" http = "0.2.1" itertools = "0.9.0" +metrohash = "1.0.6" num_cpus = "1.13.0" log = "0.4.8" once_cell = "1.3.1" @@ -66,5 +66,5 @@ capnpc = "0.12.1" [dev-dependencies] async-std = { version = "1.5.0", features = ["attributes"] } chrono = "0.4.11" -parquet = "0.15.1" +parquet = "5.0.0" tempfile = "3" diff --git a/examples/dijkstra.rs b/examples/dijkstra.rs new file mode 100644 index 00000000..7bfba49f --- /dev/null +++ b/examples/dijkstra.rs @@ -0,0 +1,106 @@ +use std::collections::{HashMap, BTreeMap}; +use std::path::PathBuf; +use std::time::Instant; +use vega::*; + +fn custom_split_nodes_text_file(node: (usize, usize, Option)) -> (usize, (usize, Option>, String)) { + let neighbors = node.2.map(|s| { + let mut v = s.split(":") + .map(|x| x.to_string()) + .collect::>(); + v.split_off(v.len()-1); + v + }); + let path = node.0.to_string(); + (node.0, (node.1, neighbors, path)) +} + +fn custom_split_neighbor(parent_path: &String, parent_distance: usize, neighbor: &String) -> (usize, (usize, Option>, String)) { + let neighbor = neighbor.split(',').collect::>(); + let (nid, mut distance): (usize, usize) = (neighbor[0].parse().unwrap(), neighbor[1].parse().unwrap()); + distance += parent_distance; + let mut path = parent_path.to_string(); + path.push_str("->"); + path.push_str(&nid.to_string()); + (nid, (distance, None, path)) +} + +fn custom_split_nodes_iterative(node: (usize, (usize, Option>, String))) -> (usize, (usize, Option>, String)) { + let (nid, (distance, neighbors, mut path)) = node; + let elements = path.split("->"); + if elements.last().unwrap().parse::().unwrap() != nid { + path.push_str("->"); + path.push_str(&nid.to_string()); + } + (nid, (distance, neighbors, path)) +} + +fn min_distance(node_value0: (usize, Option>, String), node_value1: (usize, Option>, String)) -> (usize, Option>, String) { + let mut neighbors = None; + let mut distance = 0; + let mut path = String::new(); + let (dist0, neighbors0, path0) = node_value0; + let (dist1, neighbors1, path1) = node_value1; + if neighbors0.is_some() { + neighbors = neighbors0; + } else { + neighbors = neighbors1; + } + if dist0 <= dist1 { + distance = dist0; + path = path0; + } else { + distance = dist1; + path = path1; + } + (distance, neighbors, path) +} + +fn main() -> Result<()> { + let sc = Context::new()?; + let now = Instant::now(); + + let deserializer = Box::new(Fn!(|file: Vec| { + bincode::deserialize::)>>(&file).unwrap() //Item = (u32, u32) + })); + + let dir = PathBuf::from("/opt/data/pt_dij_1"); + let mut nodes = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer) + .flat_map(Fn!(|nodes: Vec<(usize, usize, Option)>| Box::new(nodes.into_iter().map(|node| custom_split_nodes_text_file(node))) as Box>)); + let mut old = nodes.aggregate( + 0, + Fn!(|local_sum: usize, node: (usize, (usize, Option>, String))| local_sum + node.1.0), + Fn!(|local_sum0, local_sum1| local_sum0 + local_sum1), + ).unwrap(); + let mut new = old.clone(); + let mut iterations = 0; + //let mut result = Text::<_, _>::new(Vec::new(), None, None); + while (iterations == 0 || old != new) && iterations < 5 { + iterations += 1; + old = new; + let mapper = nodes.flat_map(Fn!(|node: (usize, (usize, Option>, String))| { + let (nid, (data0, data1, data2)) = node.clone(); + let mut res = Vec::new(); + res.push(node); + if let Some(d) = data1 { + res.append(&mut d.into_iter() + .map(|neighbor: String| custom_split_neighbor(&data2, data0, &neighbor)) + .collect::>()); + } + Box::new(res.into_iter()) as Box> + })); + let reducer = mapper.reduce_by_key(Fn!(|(x, y)| min_distance(x, y)), 1); + nodes = reducer.map(Fn!(|node| custom_split_nodes_iterative(node))); + //nodes.cache(); + new = nodes.aggregate( + 0, + Fn!(|local_sum: usize, node: (usize, (usize, Option>, String))| local_sum + node.1.0), + Fn!(|local_sum0, local_sum1| local_sum0 + local_sum1), + ).unwrap(); + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("new = {:?}, elapsed time = {:?}", new, dur); + } + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("Finished after {:?} iterations, total time {:?}s", iterations, dur); + Ok(()) +} \ No newline at end of file diff --git a/examples/kmeans.rs b/examples/kmeans.rs new file mode 100644 index 00000000..6af5285b --- /dev/null +++ b/examples/kmeans.rs @@ -0,0 +1,102 @@ +use std::{collections::HashMap, env::temp_dir}; +use std::path::PathBuf; +use std::time::Instant; +use vega::*; +use rand::Rng; +use serde_derive::{Deserialize, Serialize}; + +fn parse_vector(line: String) -> Vec { + line.split(' ') + .map(|number| number.parse::().unwrap()) + .collect() +} + +fn squared_distance(p: &Vec, center: &Vec) -> f64 { + assert_eq!(p.len(), center.len()); + let sum = p.iter().zip(center.iter()).fold(0 as f64, |acc, (x, y)| { + let delta = *y - *x; + acc + delta * delta + }); + sum.sqrt() +} + +fn closest_point(p: &Vec, centers: &Vec>) -> usize { + let mut best_index = 0; + let mut closest = f64::MAX; + + for (index, center) in centers.iter().enumerate() { + let temp_dist = squared_distance(p, center); + if temp_dist < closest { + closest = temp_dist; + best_index = index + } + } + + best_index +} + +fn merge_results(a: (Vec, i32), b: (Vec, i32)) -> (Vec, i32) { + ( + a.0.iter().zip(b.0.iter()).map(|(x, y)| x + y).collect(), + a.1 + b.1, + ) +} + +fn main() -> Result<()> { + let sc = Context::new()?; + let now = Instant::now(); + + // TODO: need to change dir + let dir = PathBuf::from("/opt/data/pt_km_50000_5"); + let deserializer = Box::new(Fn!(|file: Vec| { + String::from_utf8(file) + .unwrap() + .lines() + .map(|s| s.to_string()) + .collect::>() + })); + + let lines = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer); + let data_rdd = lines.flat_map(Fn!(|lines: Vec| { + Box::new(lines.into_iter().map(|line| { + parse_vector::<>(line) + })) as Box> + })); + //data_rdd.cache(); + + let k = 10; + let converge_dist = 0.3; + let mut iter = 0; + let mut k_points = data_rdd.take(k).unwrap(); + let mut temp_dist = 100.0; + while temp_dist > converge_dist && iter < 5 { + let k_points_c = k_points.clone(); + let closest = data_rdd.map( + Fn!(move |p| { + (closest_point(&p, &k_points_c), (p, 1)) + })); + let point_stats = closest.reduce_by_key(Fn!(|(a, b)| merge_results(a, b)), 1); + let new_points = point_stats.map( + Fn!(|pair: (usize, (Vec, i32))| + (pair.0, pair.1.0.iter().map(|x| x * 1.0 / pair.1.1 as f64).collect::>()) + ), + ).collect().unwrap(); + println!("new_points = {:?}", new_points); + let new_points = new_points.into_iter().collect::>>(); + temp_dist = 0.0; + for i in 0..k as usize { + temp_dist += squared_distance(&k_points[i], &new_points[&i]); + } + + for (idx, point) in new_points { + k_points[idx] = point; + } + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("temp_dist = {:?}, time at iter {:?}: {:?}", temp_dist, iter, dur); + + iter += 1; + } + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("total time {:?} s, k_points = {:?}", dur, k_points); + Ok(()) +} \ No newline at end of file diff --git a/examples/lr.rs b/examples/lr.rs new file mode 100644 index 00000000..d026b735 --- /dev/null +++ b/examples/lr.rs @@ -0,0 +1,53 @@ +use std::time::Instant; +use std::path::PathBuf; +use vega::*; +use rand::Rng; +use rand_distr::{Normal, Distribution}; +use serde_derive::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Default, Clone, Debug)] +pub struct Point { + x: Vec, + y: f32, +} + +fn main() -> Result<()> { + let sc = Context::new()?; + + let mut rng = rand::thread_rng(); + let dim = 5; + let deserializer = Box::new(Fn!(|file: Vec| { + bincode::deserialize::>(&file).unwrap() //Item = Point + })); + + let dir = PathBuf::from("/opt/data/pt_lr_1"); + let mut points_rdd = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer) + .flat_map(Fn!(|v: Vec| Box::new(v.into_iter()) as Box>)); + let mut w = (0..dim).map(|_| rng.gen::()).collect::>(); + let iter_num = 3; + let now = Instant::now(); + for i in 0..iter_num { + let w_c = w.clone(); + let g = points_rdd.map(Fn!(move |p: Point| { + let y = p.y; + p.x.iter().zip(w.iter()) + .map(|(&x, &w): (&f32, &f32)| x * (1f32/(1f32+(-y * (w * x)).exp())-1f32) * y) + .collect::>() + })) + .reduce(Fn!(|x: Vec, y: Vec| x.into_iter() + .zip(y.into_iter()) + .map(|(x, y)| x + y) + .collect::>())) + .unwrap(); + w = w_c.into_iter() + .zip(g.unwrap().into_iter()) + .map(|(x, y)| x-y) + .collect::>(); + println!("{:?}: w = {:?}", i, w); + } + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("total time {:?} s", dur); + println!("w = {:?}", w); + + Ok(()) +} diff --git a/examples/mm.rs b/examples/mm.rs new file mode 100644 index 00000000..d146da34 --- /dev/null +++ b/examples/mm.rs @@ -0,0 +1,39 @@ +use std::collections::{HashMap, BTreeMap}; +use std::path::PathBuf; +use std::time::Instant; +use vega::*; + +// unsecure mode +fn main() -> Result<()> { + let sc = Context::new()?; + let now = Instant::now(); + + let deserializer = Box::new(Fn!(|file: Vec| { + bincode::deserialize::>(&file).unwrap() //Item = ((u32, u32), f64) + })); + + let dir_a = PathBuf::from("/opt/data/pt_mm_a_100"); + let dir_b = PathBuf::from("/opt/data/pt_mm_b_100"); + let ma = sc.read_source(LocalFsReaderConfig::new(dir_a).num_partitions_per_executor(1), deserializer.clone()) + .flat_map(Fn!(|va: Vec<((u32, u32), f64)>| { + Box::new(va.into_iter().map(|a| + (a.0.1, (a.0.0, a.1)) + )) as Box> + })); + let mb = sc.read_source(LocalFsReaderConfig::new(dir_b).num_partitions_per_executor(1), deserializer) + .flat_map(Fn!(|vb: Vec<((u32, u32), f64)>| { + Box::new(vb.into_iter().map(|b| + (b.0.0, (b.0.1, b.1)) + )) as Box> + })); + + let temp = ma.join(mb, 1) + .map(Fn!(|n: (u32, ((u32, f64), (u32, f64)))| ((n.1.0.0, n.1.1.0), n.1.0.1 * n.1.1.1))); + + let mc = temp.reduce_by_key(Fn!(|(x, y)| x + y), 1); + + let output = mc.count().unwrap(); + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("count = {:?}, total time = {:?}", output, dur); + Ok(()) +} \ No newline at end of file diff --git a/examples/pagerank.rs b/examples/pagerank.rs new file mode 100644 index 00000000..ede6d44a --- /dev/null +++ b/examples/pagerank.rs @@ -0,0 +1,57 @@ +use std::collections::{HashMap, BTreeMap}; +use std::path::PathBuf; +use std::time::Instant; +use vega::*; + +fn main() -> Result<()> { + let sc = Context::new()?; + let now = Instant::now(); + + let deserializer = Box::new(Fn!(|file: Vec| { + String::from_utf8(file) + .unwrap() + .lines() + .map(|s| s.to_string()) + .collect::>() + })); + + let iters = 1; //7 causes core dump, why? some hints: converge when 6 + let dir = PathBuf::from("/opt/data/pt_pr"); + let lines = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer); + let links = lines.flat_map(Fn!(|lines: Vec| { + Box::new(lines.into_iter().map(|line| { + let parts = line.split(" ") + .collect::>(); + (parts[0].to_string(), parts[1].to_string()) + })) as Box> + })).distinct_with_num_partitions(1) + .group_by_key(1); + //links.cache(); + let mut ranks = links.map_values(Fn!(|_| 1.0)); + + for _ in 0..iters { + let contribs = links.join(ranks, 1) + .map(Fn!(|(k, v)| v)) + .flat_map(Fn!(|(urls, rank): (Vec, f64)| { + let size = urls.len() as f64; + Box::new(urls.into_iter().map(move |url| (url, rank / size))) + as Box> + }) + ); + ranks = contribs.reduce_by_key(Fn!(|(x, y)| x + y), 1) + .map_values(Fn!(|v| 0.15 + 0.85 * v)); + } + + let output = ranks.reduce(Fn!(|x: (String, f64), y: (String, f64)| { + if x.1 > y.1 { + x + } else { + y + } + })).unwrap(); + let output = output.unwrap(); + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("{:?} rank first with {:?}, total time = {:?}", output.0, output.1, dur); + + Ok(()) +} diff --git a/examples/parquet_column_read.rs b/examples/parquet_column_read.rs index b10890e4..609a70d5 100644 --- a/examples/parquet_column_read.rs +++ b/examples/parquet_column_read.rs @@ -28,7 +28,7 @@ fn main() -> Result<()> { fn read(file: PathBuf) -> Vec<((i32, String, i64), (i64, f64))> { let file = File::open(file).unwrap(); let reader = SerializedFileReader::new(file).unwrap(); - let metadata = reader.metadata(); + let metadata = reader.metadata().clone(); let batch_size = 500_000 as usize; let iter = (0..metadata.num_row_groups()).flat_map(move |i| { let row_group_reader = reader.get_row_group(i).unwrap(); diff --git a/examples/pearson.rs b/examples/pearson.rs new file mode 100644 index 00000000..0d8cabdb --- /dev/null +++ b/examples/pearson.rs @@ -0,0 +1,36 @@ +use std::path::PathBuf; +use std::time::Instant; +use vega::*; +use rand::Rng; + +fn main() -> Result<()> { + let sc = Context::new()?; + let deserializer = Box::new(Fn!(|file: Vec| { + bincode::deserialize::>(&file).unwrap() + })); + let now = Instant::now(); + let dir0 = PathBuf::from("/opt/data/pt_pe_a_105"); + let dir1 = PathBuf::from("/opt/data/pt_pe_b_105"); + let x = sc.read_source(LocalFsReaderConfig::new(dir0).num_partitions_per_executor(1), deserializer.clone()) + .flat_map(Fn!(|v: Vec| Box::new(v.into_iter()) as Box>)); + let y = sc.read_source(LocalFsReaderConfig::new(dir1).num_partitions_per_executor(1), deserializer) + .flat_map(Fn!(|v: Vec| Box::new(v.into_iter()) as Box>)); + + let mx = x.reduce(Fn!(|a, b| a+b)).unwrap().unwrap()/ + x.count().unwrap() as f64; + let my = y.reduce(Fn!(|a, b| a+b)).unwrap().unwrap()/ + y.count().unwrap() as f64; + + let (upper, lowerx, lowery) = x.zip(y.into()) + .map(Fn!(move |pair: (f64, f64)| { + let up = (pair.0 - mx) * (pair.1 - my); + let lowx = (pair.0 - mx) * (pair.0 - mx); + let lowy = (pair.1 - my) * (pair.1 - my); + (up, lowx, lowy) + })) + .reduce(Fn!(|a: (f64, f64, f64), b: (f64, f64, f64)| (a.0+b.0, a.1+b.1, a.2+b.2))).unwrap().unwrap(); + let r = upper / (f64::sqrt(lowerx) * f64::sqrt(lowery)); + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("total time {:?} s, r = {:?}", dur, r); + Ok(()) +} diff --git a/examples/tc.rs b/examples/tc.rs new file mode 100644 index 00000000..9b706f4b --- /dev/null +++ b/examples/tc.rs @@ -0,0 +1,40 @@ +use std::collections::HashSet; +use std::path::PathBuf; +use std::time::Instant; +use vega::*; +use rand::Rng; + +fn main() -> Result<()> { + let sc = Context::new()?; + let now = Instant::now(); + + let deserializer = Box::new(Fn!(|file: Vec| { + bincode::deserialize::>(&file).unwrap() //Item = (u32, u32) + })); + + let dir = PathBuf::from("/opt/data/pt_tc_2"); + let mut tc = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer) + .flat_map(Fn!(|v: Vec<(u32, u32)>| Box::new(v.into_iter()) as Box>)); + //tc.cache(); + let edges = tc.map(Fn!(|x: (u32, u32)| (x.1, x.0))); + + // This join is iterated until a fixed point is reached. + let mut old_count = 0; + let mut next_count = tc.count().unwrap(); + let mut iter = 0; + while next_count != old_count && iter < 5 { + old_count = next_count; + tc = tc.union( + tc.join(edges.clone(), 1) + .map(Fn!(|x: (u32, (u32, u32))| (x.1.1, x.1.0))) + .into() + ).unwrap().distinct_with_num_partitions(1); + //tc.cache(); + next_count = tc.count().unwrap(); + iter += 1; + println!("next_count = {:?}", next_count); + } + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("total time {:?}s", dur); + Ok(()) +} \ No newline at end of file diff --git a/examples/tri.rs b/examples/tri.rs new file mode 100644 index 00000000..da0eda17 --- /dev/null +++ b/examples/tri.rs @@ -0,0 +1,33 @@ +use std::collections::HashSet; +use std::path::PathBuf; +use std::time::Instant; +use vega::*; +use rand::Rng; + +fn main() -> Result<()> { + let sc = Context::new()?; + let now = Instant::now(); + + let deserializer = Box::new(Fn!(|file: Vec| { + bincode::deserialize::>(&file).unwrap() //Item = (u32, u32) + })); + + let dir = PathBuf::from("/opt/data/pt_tc_7"); + let graph = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer) + .flat_map(Fn!(|v: Vec<(u32, u32)>| Box::new(v.into_iter()) as Box>)) + .filter(Fn!(|edge: &(u32, u32)| edge.0 != edge.1)) + .map(Fn!(|edge: (u32, u32)| match edge.0 < edge.1 { + true => edge, + false => (edge.1, edge.0) + })).distinct(); + //graph.cache(); + let count = graph.join(graph.clone(), 1) + .key_by(Fn!(|item: &(u32, (u32, u32))| item.1)) + .join(graph.clone() + .map(Fn!(|edge| (edge, 1 as i32))), + 1 + ).count().unwrap(); + let dur = now.elapsed().as_nanos() as f64 * 1e-9; + println!("total time {:?}s, count = {:?}", dur, count); + Ok(()) +} \ No newline at end of file diff --git a/rust-toolchain b/rust-toolchain index 915f80ee..73c438fb 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2020-05-31 \ No newline at end of file +nightly-2021-06-29 diff --git a/src/io/local_file_reader.rs b/src/io/local_file_reader.rs index 94a5e249..9dc9cd30 100644 --- a/src/io/local_file_reader.rs +++ b/src/io/local_file_reader.rs @@ -3,7 +3,7 @@ use std::io::{BufReader, Read}; use std::marker::PhantomData; use std::net::{Ipv4Addr, SocketAddrV4}; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use crate::context::Context; use crate::dependency::Dependency; @@ -110,7 +110,7 @@ pub struct LocalFsReader { expect_dir: bool, executor_partitions: Option, #[serde(skip_serializing, skip_deserializing)] - context: Arc, + context: Weak, // explicitly copy the address map as the map under context is not // deserialized in tasks and this is required: splits: Vec, @@ -139,7 +139,7 @@ impl LocalFsReader { expect_dir, executor_partitions, splits: context.address_map.clone(), - context, + context: Arc::downgrade(&context), _marker_reader_data: PhantomData, } } @@ -310,7 +310,7 @@ macro_rules! impl_common_lfs_rddb_funcs { } fn get_context(&self) -> Arc { - self.context.clone() + self.context.upgrade().unwrap() } fn get_dependencies(&self) -> Vec { diff --git a/src/partitioner.rs b/src/partitioner.rs index 27384d62..21224add 100644 --- a/src/partitioner.rs +++ b/src/partitioner.rs @@ -1,6 +1,6 @@ use crate::serializable_traits::Data; use downcast_rs::Downcast; -use fasthash::MetroHasher; +use metrohash::MetroHash; use serde_derive::{Deserialize, Serialize}; use serde_traitobject::{Deserialize, Serialize}; use std::any::Any; @@ -19,7 +19,7 @@ pub trait Partitioner: dyn_clone::clone_trait_object!(Partitioner); fn hash(t: &T) -> u64 { - let mut s: MetroHasher = Default::default(); + let mut s: MetroHash = Default::default(); t.hash(&mut s); s.finish() } diff --git a/src/rdd/cartesian_rdd.rs b/src/rdd/cartesian_rdd.rs index 829c2289..2018c3bd 100644 --- a/src/rdd/cartesian_rdd.rs +++ b/src/rdd/cartesian_rdd.rs @@ -80,7 +80,7 @@ impl RddBase for CartesianRdd { } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + Vec::new() } fn splits(&self) -> Vec> { diff --git a/src/rdd/co_grouped_rdd.rs b/src/rdd/co_grouped_rdd.rs index a824d7f0..b8e0f628 100644 --- a/src/rdd/co_grouped_rdd.rs +++ b/src/rdd/co_grouped_rdd.rs @@ -75,6 +75,35 @@ impl CoGroupedRdd { pub fn new(rdds: Vec>, part: Box) -> Self { let context = rdds[0].get_context(); let mut vals = RddVals::new(context.clone()); + for (_index, rdd) in rdds.iter().enumerate() { + if !rdd + .partitioner() + .map_or(false, |p| p.equals(&part as &dyn Any)) + { + vals.shuffle_ids.push(context.new_shuffle_id()); + } + } + let vals = Arc::new(vals); + CoGroupedRdd { + vals, + rdds, + part, + _marker: PhantomData, + } + } +} + +impl RddBase for CoGroupedRdd { + fn get_rdd_id(&self) -> usize { + self.vals.id + } + + fn get_context(&self) -> Arc { + self.vals.context.upgrade().unwrap() + } + + fn get_dependencies(&self) -> Vec { + let mut shuffle_ids = self.vals.shuffle_ids.clone(); let create_combiner = Box::new(Fn!(|v: Box| vec![v])); fn merge_value( mut buf: Vec>, @@ -100,7 +129,8 @@ impl CoGroupedRdd { ), ); let mut deps = Vec::new(); - for (_index, rdd) in rdds.iter().enumerate() { + let part = self.part.clone(); + for (_index, rdd) in self.rdds.iter().enumerate() { let part = part.clone(); if rdd .partitioner() @@ -112,10 +142,9 @@ impl CoGroupedRdd { )) } else { let rdd_base = rdd.clone().into(); - log::debug!("creating aggregator inside cogrouprdd"); deps.push(Dependency::ShuffleDependency( Arc::new(ShuffleDependency::new( - context.new_shuffle_id(), + shuffle_ids.remove(0), true, rdd_base, aggr.clone(), @@ -124,28 +153,7 @@ impl CoGroupedRdd { )) } } - vals.dependencies = deps; - let vals = Arc::new(vals); - CoGroupedRdd { - vals, - rdds, - part, - _marker: PhantomData, - } - } -} - -impl RddBase for CoGroupedRdd { - fn get_rdd_id(&self) -> usize { - self.vals.id - } - - fn get_context(&self) -> Arc { - self.vals.context.upgrade().unwrap() - } - - fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + deps } fn splits(&self) -> Vec> { diff --git a/src/rdd/flatmapper_rdd.rs b/src/rdd/flatmapper_rdd.rs index 90e1f161..f1fc0bae 100644 --- a/src/rdd/flatmapper_rdd.rs +++ b/src/rdd/flatmapper_rdd.rs @@ -40,11 +40,7 @@ where F: SerFunc(T) -> Box>, { pub(crate) fn new(prev: Arc>, f: F) -> Self { - let mut vals = RddVals::new(prev.get_context()); - vals.dependencies - .push(Dependency::NarrowDependency(Arc::new( - OneToOneDependency::new(prev.get_rdd_base()), - ))); + let vals = RddVals::new(prev.get_context()); let vals = Arc::new(vals); FlatMapperRdd { prev, @@ -68,7 +64,9 @@ where } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + vec![Dependency::NarrowDependency(Arc::new( + OneToOneDependency::new(self.prev.get_rdd_base()), + ))] } fn splits(&self) -> Vec> { diff --git a/src/rdd/map_partitions_rdd.rs b/src/rdd/map_partitions_rdd.rs index f476d5fa..dad62034 100644 --- a/src/rdd/map_partitions_rdd.rs +++ b/src/rdd/map_partitions_rdd.rs @@ -48,11 +48,7 @@ where F: SerFunc(usize, Box>) -> Box>, { pub(crate) fn new(prev: Arc>, f: F) -> Self { - let mut vals = RddVals::new(prev.get_context()); - vals.dependencies - .push(Dependency::NarrowDependency(Arc::new( - OneToOneDependency::new(prev.get_rdd_base()), - ))); + let vals = RddVals::new(prev.get_context()); let vals = Arc::new(vals); MapPartitionsRdd { name: Mutex::new("map_partitions".to_owned()), @@ -92,7 +88,9 @@ where } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + vec![Dependency::NarrowDependency(Arc::new( + OneToOneDependency::new(self.prev.get_rdd_base()), + ))] } fn preferred_locations(&self, split: Box) -> Vec { diff --git a/src/rdd/mapper_rdd.rs b/src/rdd/mapper_rdd.rs index 11fa367c..dcd60629 100644 --- a/src/rdd/mapper_rdd.rs +++ b/src/rdd/mapper_rdd.rs @@ -48,11 +48,7 @@ where F: SerFunc(T) -> U, { pub(crate) fn new(prev: Arc>, f: F) -> Self { - let mut vals = RddVals::new(prev.get_context()); - vals.dependencies - .push(Dependency::NarrowDependency(Arc::new( - OneToOneDependency::new(prev.get_rdd_base()), - ))); + let vals = RddVals::new(prev.get_context()); let vals = Arc::new(vals); MapperRdd { name: Mutex::new("map".to_owned()), @@ -92,7 +88,9 @@ where } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + vec![Dependency::NarrowDependency(Arc::new( + OneToOneDependency::new(self.prev.get_rdd_base()), + ))] } fn preferred_locations(&self, split: Box) -> Vec { diff --git a/src/rdd/pair_rdd.rs b/src/rdd/pair_rdd.rs index c4b93579..6192f0ce 100644 --- a/src/rdd/pair_rdd.rs +++ b/src/rdd/pair_rdd.rs @@ -210,11 +210,7 @@ where F: Func(V) -> U + Clone, { fn new(prev: Arc>, f: F) -> Self { - let mut vals = RddVals::new(prev.get_context()); - vals.dependencies - .push(Dependency::NarrowDependency(Arc::new( - OneToOneDependency::new(prev.get_rdd_base()), - ))); + let vals = RddVals::new(prev.get_context()); let vals = Arc::new(vals); MappedValuesRdd { prev, @@ -238,7 +234,9 @@ where self.vals.context.upgrade().unwrap() } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + vec![Dependency::NarrowDependency(Arc::new( + OneToOneDependency::new(self.prev.get_rdd_base()), + ))] } fn splits(&self) -> Vec> { self.prev.splits() @@ -322,11 +320,7 @@ where F: Func(V) -> Box> + Clone, { fn new(prev: Arc>, f: F) -> Self { - let mut vals = RddVals::new(prev.get_context()); - vals.dependencies - .push(Dependency::NarrowDependency(Arc::new( - OneToOneDependency::new(prev.get_rdd_base()), - ))); + let vals = RddVals::new(prev.get_context()); let vals = Arc::new(vals); FlatMappedValuesRdd { prev, @@ -350,7 +344,9 @@ where self.vals.context.upgrade().unwrap() } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + vec![Dependency::NarrowDependency(Arc::new( + OneToOneDependency::new(self.prev.get_rdd_base()), + ))] } fn splits(&self) -> Vec> { self.prev.splits() diff --git a/src/rdd/parallel_collection_rdd.rs b/src/rdd/parallel_collection_rdd.rs index 3047e818..c3e1ba68 100644 --- a/src/rdd/parallel_collection_rdd.rs +++ b/src/rdd/parallel_collection_rdd.rs @@ -176,7 +176,7 @@ impl RddBase for ParallelCollection { } fn get_dependencies(&self) -> Vec { - self.rdd_vals.vals.dependencies.clone() + Vec::new() } fn splits(&self) -> Vec> { diff --git a/src/rdd/partitionwise_sampled_rdd.rs b/src/rdd/partitionwise_sampled_rdd.rs index fd38446a..415add41 100644 --- a/src/rdd/partitionwise_sampled_rdd.rs +++ b/src/rdd/partitionwise_sampled_rdd.rs @@ -27,11 +27,7 @@ impl PartitionwiseSampledRdd { sampler: Arc>, preserves_partitioning: bool, ) -> Self { - let mut vals = RddVals::new(prev.get_context()); - vals.dependencies - .push(Dependency::NarrowDependency(Arc::new( - OneToOneDependency::new(prev.get_rdd_base()), - ))); + let vals = RddVals::new(prev.get_context()); let vals = Arc::new(vals); PartitionwiseSampledRdd { @@ -66,7 +62,9 @@ impl RddBase for PartitionwiseSampledRdd { } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + vec![Dependency::NarrowDependency(Arc::new( + OneToOneDependency::new(self.prev.get_rdd_base()), + ))] } fn splits(&self) -> Vec> { diff --git a/src/rdd/rdd.rs b/src/rdd/rdd.rs index d7ee4668..8f26d9a6 100644 --- a/src/rdd/rdd.rs +++ b/src/rdd/rdd.rs @@ -20,7 +20,7 @@ use crate::split::Split; use crate::utils::bounded_priority_queue::BoundedPriorityQueue; use crate::utils::random::{BernoulliCellSampler, BernoulliSampler, PoissonSampler, RandomSampler}; use crate::{utils, Fn, SerArc, SerBox}; -use fasthash::MetroHasher; +use metrohash::MetroHash; use rand::{Rng, SeedableRng}; use serde_derive::{Deserialize, Serialize}; use serde_traitobject::{Deserialize, Serialize}; @@ -53,7 +53,7 @@ pub use union_rdd::*; #[derive(Serialize, Deserialize)] pub(crate) struct RddVals { pub id: usize, - pub dependencies: Vec, + pub shuffle_ids: Vec, should_cache: bool, #[serde(skip_serializing, skip_deserializing)] pub context: Weak, @@ -63,7 +63,7 @@ impl RddVals { pub fn new(sc: Arc) -> Self { RddVals { id: sc.new_rdd_id(), - dependencies: Vec::new(), + shuffle_ids: Vec::new(), should_cache: false, context: Arc::downgrade(&sc), } @@ -392,7 +392,7 @@ pub trait Rdd: RddBase + 'static { use std::hash::Hasher; let distributed_partition = Fn!( move |index: usize, items: Box>| { - let mut hasher = MetroHasher::default(); + let mut hasher = MetroHash::default(); index.hash(&mut hasher); let mut rand = utils::random::get_default_rng_from_seed(hasher.finish()); let mut position = rand.gen_range(0, num_partitions); @@ -1056,15 +1056,15 @@ pub trait Rdd: RddBase + 'static { } /// Creates tuples of the elements in this RDD by applying `f`. - fn key_by(&self, func: F) -> SerArc> + fn key_by(&self, func: F) -> SerArc> where Self: Sized, T: Data, F: SerFunc(&Self::Item) -> T, { - self.map(Fn!(move |k: Self::Item| -> (Self::Item, T) { + self.map(Fn!(move |k: Self::Item| -> (T, Self::Item) { let t = (func)(&k); - (k, t) + (t, k) })) } diff --git a/src/rdd/shuffled_rdd.rs b/src/rdd/shuffled_rdd.rs index e22ed1b1..90a532fe 100644 --- a/src/rdd/shuffled_rdd.rs +++ b/src/rdd/shuffled_rdd.rs @@ -64,17 +64,7 @@ impl ShuffledRdd { let ctx = parent.get_context(); let shuffle_id = ctx.new_shuffle_id(); let mut vals = RddVals::new(ctx); - - vals.dependencies - .push(Dependency::ShuffleDependency(Arc::new( - ShuffleDependency::new( - shuffle_id, - false, - parent.get_rdd_base(), - aggregator.clone(), - part.clone(), - ), - ))); + vals.shuffle_ids.push(shuffle_id); let vals = Arc::new(vals); ShuffledRdd { parent, @@ -96,7 +86,15 @@ impl RddBase for ShuffledRdd { } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + vec![Dependency::ShuffleDependency(Arc::new( + ShuffleDependency::new( + self.shuffle_id, + false, + self.parent.get_rdd_base(), + self.aggregator.clone(), + self.part.clone(), + ), + ))] } fn splits(&self) -> Vec> { diff --git a/src/rdd/union_rdd.rs b/src/rdd/union_rdd.rs index 6fa22db7..48e10695 100644 --- a/src/rdd/union_rdd.rs +++ b/src/rdd/union_rdd.rs @@ -107,25 +107,11 @@ impl Clone for UnionVariants { impl UnionVariants { fn new(rdds: &[Arc>]) -> Result { let context = rdds[0].get_context(); - let mut vals = RddVals::new(context); + let vals = RddVals::new(context); - let mut pos = 0; let final_rdds: Vec<_> = rdds.iter().map(|rdd| rdd.clone().into()).collect(); if !UnionVariants::has_unique_partitioner(rdds) { - let deps = rdds - .iter() - .map(|rdd| { - let rdd_base = rdd.get_rdd_base(); - let num_parts = rdd_base.number_of_splits(); - let dep = Dependency::NarrowDependency(Arc::new(RangeDependency::new( - rdd_base, 0, pos, num_parts, - ))); - pos += num_parts; - dep - }) - .collect(); - vals.dependencies = deps; let vals = Arc::new(vals); log::debug!("inside unique partitioner constructor"); Ok(NonUniquePartitioner { @@ -134,18 +120,8 @@ impl UnionVariants { }) } else { let part = rdds[0].partitioner().ok_or(Error::LackingPartitioner)?; - log::debug!("inside partition aware constructor"); - let deps = rdds - .iter() - .map(|x| { - Dependency::NarrowDependency( - Arc::new(OneToOneDependency::new(x.get_rdd_base())) - as Arc, - ) - }) - .collect(); - vals.dependencies = deps; let vals = Arc::new(vals); + log::debug!("inside partition aware constructor"); Ok(PartitionerAware { rdds: final_rdds, vals, @@ -209,9 +185,39 @@ impl RddBase for UnionRdd { } fn get_dependencies(&self) -> Vec { - match &self.0 { - NonUniquePartitioner { vals, .. } => vals.dependencies.clone(), - PartitionerAware { vals, .. } => vals.dependencies.clone(), + let mut pos = 0; + let rdds = match &self.0 { + NonUniquePartitioner { rdds, .. } => + rdds.iter().map(|rdd| rdd.clone().into()).collect::>(), + PartitionerAware { rdds, .. } => + rdds.iter().map(|rdd| rdd.clone().into()).collect::>(), + }; + + if !UnionVariants::has_unique_partitioner(&rdds) { + let deps = rdds + .iter() + .map(|rdd| { + let rdd_base = rdd.get_rdd_base(); + let num_parts = rdd_base.number_of_splits(); + let dep = Dependency::NarrowDependency(Arc::new(RangeDependency::new( + rdd_base, 0, pos, num_parts, + ))); + pos += num_parts; + dep + }) + .collect(); + deps + } else { + let deps = rdds + .iter() + .map(|x| { + Dependency::NarrowDependency( + Arc::new(OneToOneDependency::new(x.get_rdd_base())) + as Arc, + ) + }) + .collect(); + deps } } diff --git a/src/rdd/zip_rdd.rs b/src/rdd/zip_rdd.rs index ed905397..bdc755db 100644 --- a/src/rdd/zip_rdd.rs +++ b/src/rdd/zip_rdd.rs @@ -59,7 +59,9 @@ impl RddBase for ZippedPartitionsRdd { } fn get_dependencies(&self) -> Vec { - self.vals.dependencies.clone() + vec![Dependency::NarrowDependency(Arc::new( + OneToOneDependency::new(self.first.get_rdd_base()), + ))] } fn splits(&self) -> Vec> { @@ -134,10 +136,6 @@ impl Rdd for ZippedPartitionsRdd { impl ZippedPartitionsRdd { pub fn new(first: Arc>, second: Arc>) -> Self { let mut vals = RddVals::new(first.get_context()); - vals.dependencies - .push(Dependency::NarrowDependency(Arc::new( - OneToOneDependency::new(first.get_rdd_base()), - ))); let vals = Arc::new(vals); ZippedPartitionsRdd { diff --git a/src/scheduler/base_scheduler.rs b/src/scheduler/base_scheduler.rs index 77de5108..b6b4548f 100644 --- a/src/scheduler/base_scheduler.rs +++ b/src/scheduler/base_scheduler.rs @@ -426,7 +426,6 @@ pub(crate) trait NativeScheduler: Send + Sync { self.get_next_task_id(), jt.run_id, stage.id, - stage.rdd.clone(), stage .shuffle_dependency .clone() diff --git a/src/shuffle/shuffle_map_task.rs b/src/shuffle/shuffle_map_task.rs index 7893c5c7..007803dc 100644 --- a/src/shuffle/shuffle_map_task.rs +++ b/src/shuffle/shuffle_map_task.rs @@ -15,8 +15,6 @@ pub(crate) struct ShuffleMapTask { pub task_id: usize, pub run_id: usize, pub stage_id: usize, - #[serde(with = "serde_traitobject")] - pub rdd: Arc, pinned: bool, #[serde(with = "serde_traitobject")] pub dep: Arc, @@ -29,7 +27,6 @@ impl ShuffleMapTask { task_id: usize, run_id: usize, stage_id: usize, - rdd: Arc, dep: Arc, partition: usize, locs: Vec, @@ -38,8 +35,7 @@ impl ShuffleMapTask { task_id, run_id, stage_id, - pinned: rdd.is_pinned(), - rdd, + pinned: dep.get_rdd_base().is_pinned(), dep, partition, locs, @@ -85,7 +81,7 @@ impl TaskBase for ShuffleMapTask { impl Task for ShuffleMapTask { fn run(&self, _id: usize) -> SerBox { - SerBox::new(self.dep.do_shuffle_task(self.rdd.clone(), self.partition)) + SerBox::new(self.dep.do_shuffle_task(self.dep.get_rdd_base(), self.partition)) as SerBox } } diff --git a/src/utils/test_utils.rs b/src/utils/test_utils.rs index 36b8068b..310d34ab 100644 --- a/src/utils/test_utils.rs +++ b/src/utils/test_utils.rs @@ -10,6 +10,7 @@ where { let ctxt = Context::with_mode(DeploymentMode::Local).unwrap(); let rdd_f = Fn!(move |data: u8| -> u8 { data }); - let rdd = ctxt.parallelize(vec![0, 1, 2], 1).map(rdd_f); + let v = vec![0; 100_000]; + let rdd = ctxt.parallelize(v, 1).map(rdd_f); ResultTask::new(2, 0, 0, rdd.into(), Arc::new(func), 0, vec![], 0) }