Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cargo/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["-Ctarget-cpu=sandybridge", "-Ctarget-feature=+aes,+sse2,+sse3,+sse4.1,+ssse3"]
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ async-trait = "0.1.30"
crossbeam = "0.7.3"
dashmap = "3.11.1"
envy = "^0.4.1"
fasthash = "0.4.0"
futures = { version = "0.3.4" }
hyper = "0.13.4"
http = "0.2.1"
itertools = "0.9.0"
metrohash = "1.0.6"
num_cpus = "1.13.0"
log = "0.4.8"
once_cell = "1.3.1"
Expand Down Expand Up @@ -66,5 +66,5 @@ capnpc = "0.12.1"
[dev-dependencies]
async-std = { version = "1.5.0", features = ["attributes"] }
chrono = "0.4.11"
parquet = "0.15.1"
parquet = "5.0.0"
tempfile = "3"
106 changes: 106 additions & 0 deletions examples/dijkstra.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::collections::{HashMap, BTreeMap};
use std::path::PathBuf;
use std::time::Instant;
use vega::*;

fn custom_split_nodes_text_file(node: (usize, usize, Option<String>)) -> (usize, (usize, Option<Vec<String>>, String)) {
let neighbors = node.2.map(|s| {
let mut v = s.split(":")
.map(|x| x.to_string())
.collect::<Vec<_>>();
v.split_off(v.len()-1);
v
});
let path = node.0.to_string();
(node.0, (node.1, neighbors, path))
}

fn custom_split_neighbor(parent_path: &String, parent_distance: usize, neighbor: &String) -> (usize, (usize, Option<Vec<String>>, String)) {
let neighbor = neighbor.split(',').collect::<Vec<_>>();
let (nid, mut distance): (usize, usize) = (neighbor[0].parse().unwrap(), neighbor[1].parse().unwrap());
distance += parent_distance;
let mut path = parent_path.to_string();
path.push_str("->");
path.push_str(&nid.to_string());
(nid, (distance, None, path))
}

fn custom_split_nodes_iterative(node: (usize, (usize, Option<Vec<String>>, String))) -> (usize, (usize, Option<Vec<String>>, String)) {
let (nid, (distance, neighbors, mut path)) = node;
let elements = path.split("->");
if elements.last().unwrap().parse::<usize>().unwrap() != nid {
path.push_str("->");
path.push_str(&nid.to_string());
}
(nid, (distance, neighbors, path))
}

fn min_distance(node_value0: (usize, Option<Vec<String>>, String), node_value1: (usize, Option<Vec<String>>, String)) -> (usize, Option<Vec<String>>, String) {
let mut neighbors = None;
let mut distance = 0;
let mut path = String::new();
let (dist0, neighbors0, path0) = node_value0;
let (dist1, neighbors1, path1) = node_value1;
if neighbors0.is_some() {
neighbors = neighbors0;
} else {
neighbors = neighbors1;
}
if dist0 <= dist1 {
distance = dist0;
path = path0;
} else {
distance = dist1;
path = path1;
}
(distance, neighbors, path)
}

fn main() -> Result<()> {
let sc = Context::new()?;
let now = Instant::now();

let deserializer = Box::new(Fn!(|file: Vec<u8>| {
bincode::deserialize::<Vec<(usize, usize, Option<String>)>>(&file).unwrap() //Item = (u32, u32)
}));

let dir = PathBuf::from("/opt/data/pt_dij_1");
let mut nodes = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer)
.flat_map(Fn!(|nodes: Vec<(usize, usize, Option<String>)>| Box::new(nodes.into_iter().map(|node| custom_split_nodes_text_file(node))) as Box<dyn Iterator<Item = _>>));
let mut old = nodes.aggregate(
0,
Fn!(|local_sum: usize, node: (usize, (usize, Option<Vec<String>>, String))| local_sum + node.1.0),
Fn!(|local_sum0, local_sum1| local_sum0 + local_sum1),
).unwrap();
let mut new = old.clone();
let mut iterations = 0;
//let mut result = Text::<_, _>::new(Vec::new(), None, None);
while (iterations == 0 || old != new) && iterations < 5 {
iterations += 1;
old = new;
let mapper = nodes.flat_map(Fn!(|node: (usize, (usize, Option<Vec<String>>, String))| {
let (nid, (data0, data1, data2)) = node.clone();
let mut res = Vec::new();
res.push(node);
if let Some(d) = data1 {
res.append(&mut d.into_iter()
.map(|neighbor: String| custom_split_neighbor(&data2, data0, &neighbor))
.collect::<Vec<_>>());
}
Box::new(res.into_iter()) as Box<dyn Iterator<Item = _>>
}));
let reducer = mapper.reduce_by_key(Fn!(|(x, y)| min_distance(x, y)), 1);
nodes = reducer.map(Fn!(|node| custom_split_nodes_iterative(node)));
//nodes.cache();
new = nodes.aggregate(
0,
Fn!(|local_sum: usize, node: (usize, (usize, Option<Vec<String>>, String))| local_sum + node.1.0),
Fn!(|local_sum0, local_sum1| local_sum0 + local_sum1),
).unwrap();
let dur = now.elapsed().as_nanos() as f64 * 1e-9;
println!("new = {:?}, elapsed time = {:?}", new, dur);
}
let dur = now.elapsed().as_nanos() as f64 * 1e-9;
println!("Finished after {:?} iterations, total time {:?}s", iterations, dur);
Ok(())
}
102 changes: 102 additions & 0 deletions examples/kmeans.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::{collections::HashMap, env::temp_dir};
use std::path::PathBuf;
use std::time::Instant;
use vega::*;
use rand::Rng;
use serde_derive::{Deserialize, Serialize};

fn parse_vector(line: String) -> Vec<f64> {
line.split(' ')
.map(|number| number.parse::<f64>().unwrap())
.collect()
}

fn squared_distance(p: &Vec<f64>, center: &Vec<f64>) -> f64 {
assert_eq!(p.len(), center.len());
let sum = p.iter().zip(center.iter()).fold(0 as f64, |acc, (x, y)| {
let delta = *y - *x;
acc + delta * delta
});
sum.sqrt()
}

fn closest_point(p: &Vec<f64>, centers: &Vec<Vec<f64>>) -> usize {
let mut best_index = 0;
let mut closest = f64::MAX;

for (index, center) in centers.iter().enumerate() {
let temp_dist = squared_distance(p, center);
if temp_dist < closest {
closest = temp_dist;
best_index = index
}
}

best_index
}

fn merge_results(a: (Vec<f64>, i32), b: (Vec<f64>, i32)) -> (Vec<f64>, i32) {
(
a.0.iter().zip(b.0.iter()).map(|(x, y)| x + y).collect(),
a.1 + b.1,
)
}

fn main() -> Result<()> {
let sc = Context::new()?;
let now = Instant::now();

// TODO: need to change dir
let dir = PathBuf::from("/opt/data/pt_km_50000_5");
let deserializer = Box::new(Fn!(|file: Vec<u8>| {
String::from_utf8(file)
.unwrap()
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>()
}));

let lines = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer);
let data_rdd = lines.flat_map(Fn!(|lines: Vec<String>| {
Box::new(lines.into_iter().map(|line| {
parse_vector::<>(line)
})) as Box<dyn Iterator<Item = _>>
}));
//data_rdd.cache();

let k = 10;
let converge_dist = 0.3;
let mut iter = 0;
let mut k_points = data_rdd.take(k).unwrap();
let mut temp_dist = 100.0;
while temp_dist > converge_dist && iter < 5 {
let k_points_c = k_points.clone();
let closest = data_rdd.map(
Fn!(move |p| {
(closest_point(&p, &k_points_c), (p, 1))
}));
let point_stats = closest.reduce_by_key(Fn!(|(a, b)| merge_results(a, b)), 1);
let new_points = point_stats.map(
Fn!(|pair: (usize, (Vec<f64>, i32))|
(pair.0, pair.1.0.iter().map(|x| x * 1.0 / pair.1.1 as f64).collect::<Vec<_>>())
),
).collect().unwrap();
println!("new_points = {:?}", new_points);
let new_points = new_points.into_iter().collect::<HashMap<usize, Vec<f64>>>();
temp_dist = 0.0;
for i in 0..k as usize {
temp_dist += squared_distance(&k_points[i], &new_points[&i]);
}

for (idx, point) in new_points {
k_points[idx] = point;
}
let dur = now.elapsed().as_nanos() as f64 * 1e-9;
println!("temp_dist = {:?}, time at iter {:?}: {:?}", temp_dist, iter, dur);

iter += 1;
}
let dur = now.elapsed().as_nanos() as f64 * 1e-9;
println!("total time {:?} s, k_points = {:?}", dur, k_points);
Ok(())
}
53 changes: 53 additions & 0 deletions examples/lr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::time::Instant;
use std::path::PathBuf;
use vega::*;
use rand::Rng;
use rand_distr::{Normal, Distribution};
use serde_derive::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Default, Clone, Debug)]
pub struct Point {
x: Vec<f32>,
y: f32,
}

fn main() -> Result<()> {
let sc = Context::new()?;

let mut rng = rand::thread_rng();
let dim = 5;
let deserializer = Box::new(Fn!(|file: Vec<u8>| {
bincode::deserialize::<Vec<Point>>(&file).unwrap() //Item = Point
}));

let dir = PathBuf::from("/opt/data/pt_lr_1");
let mut points_rdd = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer)
.flat_map(Fn!(|v: Vec<Point>| Box::new(v.into_iter()) as Box<dyn Iterator<Item = _>>));
let mut w = (0..dim).map(|_| rng.gen::<f32>()).collect::<Vec<_>>();
let iter_num = 3;
let now = Instant::now();
for i in 0..iter_num {
let w_c = w.clone();
let g = points_rdd.map(Fn!(move |p: Point| {
let y = p.y;
p.x.iter().zip(w.iter())
.map(|(&x, &w): (&f32, &f32)| x * (1f32/(1f32+(-y * (w * x)).exp())-1f32) * y)
.collect::<Vec<_>>()
}))
.reduce(Fn!(|x: Vec<f32>, y: Vec<f32>| x.into_iter()
.zip(y.into_iter())
.map(|(x, y)| x + y)
.collect::<Vec<_>>()))
.unwrap();
w = w_c.into_iter()
.zip(g.unwrap().into_iter())
.map(|(x, y)| x-y)
.collect::<Vec<_>>();
println!("{:?}: w = {:?}", i, w);
}
let dur = now.elapsed().as_nanos() as f64 * 1e-9;
println!("total time {:?} s", dur);
println!("w = {:?}", w);

Ok(())
}
39 changes: 39 additions & 0 deletions examples/mm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::collections::{HashMap, BTreeMap};
use std::path::PathBuf;
use std::time::Instant;
use vega::*;

// unsecure mode
fn main() -> Result<()> {
let sc = Context::new()?;
let now = Instant::now();

let deserializer = Box::new(Fn!(|file: Vec<u8>| {
bincode::deserialize::<Vec<((u32, u32), f64)>>(&file).unwrap() //Item = ((u32, u32), f64)
}));

let dir_a = PathBuf::from("/opt/data/pt_mm_a_100");
let dir_b = PathBuf::from("/opt/data/pt_mm_b_100");
let ma = sc.read_source(LocalFsReaderConfig::new(dir_a).num_partitions_per_executor(1), deserializer.clone())
.flat_map(Fn!(|va: Vec<((u32, u32), f64)>| {
Box::new(va.into_iter().map(|a|
(a.0.1, (a.0.0, a.1))
)) as Box<dyn Iterator<Item = _>>
}));
let mb = sc.read_source(LocalFsReaderConfig::new(dir_b).num_partitions_per_executor(1), deserializer)
.flat_map(Fn!(|vb: Vec<((u32, u32), f64)>| {
Box::new(vb.into_iter().map(|b|
(b.0.0, (b.0.1, b.1))
)) as Box<dyn Iterator<Item = _>>
}));

let temp = ma.join(mb, 1)
.map(Fn!(|n: (u32, ((u32, f64), (u32, f64)))| ((n.1.0.0, n.1.1.0), n.1.0.1 * n.1.1.1)));

let mc = temp.reduce_by_key(Fn!(|(x, y)| x + y), 1);

let output = mc.count().unwrap();
let dur = now.elapsed().as_nanos() as f64 * 1e-9;
println!("count = {:?}, total time = {:?}", output, dur);
Ok(())
}
57 changes: 57 additions & 0 deletions examples/pagerank.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::collections::{HashMap, BTreeMap};
use std::path::PathBuf;
use std::time::Instant;
use vega::*;

fn main() -> Result<()> {
let sc = Context::new()?;
let now = Instant::now();

let deserializer = Box::new(Fn!(|file: Vec<u8>| {
String::from_utf8(file)
.unwrap()
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>()
}));

let iters = 1; //7 causes core dump, why? some hints: converge when 6
let dir = PathBuf::from("/opt/data/pt_pr");
let lines = sc.read_source(LocalFsReaderConfig::new(dir).num_partitions_per_executor(1), deserializer);
let links = lines.flat_map(Fn!(|lines: Vec<String>| {
Box::new(lines.into_iter().map(|line| {
let parts = line.split(" ")
.collect::<Vec<_>>();
(parts[0].to_string(), parts[1].to_string())
})) as Box<dyn Iterator<Item = _>>
})).distinct_with_num_partitions(1)
.group_by_key(1);
//links.cache();
let mut ranks = links.map_values(Fn!(|_| 1.0));

for _ in 0..iters {
let contribs = links.join(ranks, 1)
.map(Fn!(|(k, v)| v))
.flat_map(Fn!(|(urls, rank): (Vec<String>, f64)| {
let size = urls.len() as f64;
Box::new(urls.into_iter().map(move |url| (url, rank / size)))
as Box<dyn Iterator<Item = _>>
})
);
ranks = contribs.reduce_by_key(Fn!(|(x, y)| x + y), 1)
.map_values(Fn!(|v| 0.15 + 0.85 * v));
}

let output = ranks.reduce(Fn!(|x: (String, f64), y: (String, f64)| {
if x.1 > y.1 {
x
} else {
y
}
})).unwrap();
let output = output.unwrap();
let dur = now.elapsed().as_nanos() as f64 * 1e-9;
println!("{:?} rank first with {:?}, total time = {:?}", output.0, output.1, dur);

Ok(())
}
2 changes: 1 addition & 1 deletion examples/parquet_column_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() -> Result<()> {
fn read(file: PathBuf) -> Vec<((i32, String, i64), (i64, f64))> {
let file = File::open(file).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
let metadata = reader.metadata().clone();
let batch_size = 500_000 as usize;
let iter = (0..metadata.num_row_groups()).flat_map(move |i| {
let row_group_reader = reader.get_row_group(i).unwrap();
Expand Down
Loading