Skip to content

Sort by #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions src/partitioner.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use crate::rdd::Rdd;
use crate::serializable_traits::Data;
use crate::utils;

use downcast_rs::Downcast;
use fasthash::MetroHasher;
use itertools::partition;
use rand::{Rng, SeedableRng};
use serde_derive::{Deserialize, Serialize};
use serde_traitobject::{Deserialize, Serialize};
use std::any::Any;
use std::cmp::{max, min};
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::sync::Arc;

/// Partitioner trait for creating Rdd partitions
pub trait Partitioner:
Expand Down Expand Up @@ -56,6 +63,118 @@ impl<K: Data + Hash + Eq> Partitioner for HashPartitioner<K> {
}
}

#[derive(Clone, Serialize, Deserialize)]
pub struct RangePartitioner<K: Data + Eq + PartialEq> {
ascending: bool,
partitions: usize,
#[serde(with = "serde_traitobject")]
range_bounds: Arc<Vec<K>>,
_marker: PhantomData<K>,
}

impl<K: Data + Eq + PartialEq + Ord + PartialOrd> RangePartitioner<K> {
pub fn new<T>(
partitions: usize,
rdd: Arc<T>,
ascending: bool,
sample_point_per_partition_hint: usize,
) -> Self
where
T: Rdd<Item = (K)> + Sized,
{
let mut range_bounds = Vec::new();
if partitions <= 1 {
} else {
let sample_size = min(1000000, sample_point_per_partition_hint * partitions);
let sample_size_per_partition = sample_size * 3 / partitions;

let mut samples = Vec::<K>::new();

samples = rdd
.map_partitions(Box::new(Fn!(
move |iter: Box<dyn Iterator<Item = K>>| -> Box<dyn Iterator<Item = K>> {
let mut res = Vec::<K>::new();

let mut rand = utils::random::get_rng_with_random_seed();
for (idx, item) in iter.enumerate() {
if idx < sample_size_per_partition {
res.push(item);
} else {
let i = rand.gen_range(0, idx);
if i < idx {
res[i] = item
}
}
}
Box::new(res.into_iter())
}
)))
.collect()
.unwrap();

samples.sort();

let step: f64 = samples.len() as f64 / (partitions - 1) as f64;
let mut i: f64 = 0.0;

for idx in 0..(partitions - 1) {
range_bounds.push(samples[min((i + step) as usize, samples.len() - 1)].clone());

i += step;
}
}

RangePartitioner {
ascending,
partitions,
range_bounds: Arc::new(range_bounds),
_marker: PhantomData,
}
}
}

impl<K: Data + Eq + PartialEq + Ord + PartialOrd> Partitioner for RangePartitioner<K> {
fn equals(&self, other: &dyn Any) -> bool {
if let Some(rp) = other.downcast_ref::<RangePartitioner<K>>() {
if self.partitions == rp.partitions && self.ascending == rp.ascending {
if self.range_bounds.len() == rp.range_bounds.len() {
for idx in (0..self.range_bounds.len()) {
if self.range_bounds[idx] != rp.range_bounds[idx] {
return false;
}
}
}
return true;
} else {
return false;
}
} else {
return false;
}
}

fn get_num_of_partitions(&self) -> usize {
self.partitions
}

fn get_partition(&self, key: &dyn Any) -> usize {
let key = key.downcast_ref::<K>().unwrap();

if self.partitions <= 1 {
return 0;
}

let r_b = &(self.range_bounds);
let len = r_b.len();
for idx in (0..len) {
if key < &r_b[idx] {
return idx;
}
}
return len;
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
40 changes: 39 additions & 1 deletion src/rdd/rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::{atomic::AtomicBool, atomic::Ordering::SeqCst, Arc};
use crate::context::Context;
use crate::dependency::{Dependency, OneToOneDependency};
use crate::error::{Error, Result};
use crate::partitioner::{HashPartitioner, Partitioner};
use crate::partitioner::{HashPartitioner, Partitioner, RangePartitioner};
use crate::serializable_traits::{AnyData, Data, Func, SerFunc};
use crate::split::Split;
use crate::task::TaskContext;
Expand Down Expand Up @@ -758,6 +758,44 @@ pub trait Rdd: RddBase + 'static {
)
)
}

fn sort_by<K, F>(
&self,
ascending: bool,
num_partitions: usize,
func: F,
) -> SerArc<dyn Rdd<Item = Self::Item>>
where
K: Data + Eq + Hash + PartialEq + Ord + PartialOrd,
F: SerFunc(&Self::Item) -> K,
Self::Item: Data + Eq + Hash,
Self: Sized + Clone,
{
let f_clone = func.clone();
let sample_rdd = self
.clone()
.map(Fn!(move |x: Self::Item| -> K { (f_clone)(&x) }));

let part = RangePartitioner::<K>::new(num_partitions, Arc::new(sample_rdd), ascending, 20);

/// func is called multiple time during sorting. perhaps change it later
let f_clone = func.clone();
let rdd = self.map(Box::new(Fn!(move |x: Self::Item| -> (K, Self::Item) {
((f_clone)(&x), x)
})));

let f_clone = func.clone();
let sort = Fn!(
move|iter: Box<dyn Iterator<Item = Self::Item>>| -> Box<dyn Iterator<Item = Self::Item>> {
let mut res: Vec<Self::Item> = iter.collect();
/// sort_by_key expect a FnMut parameter, but f_clone only implement Fn
/// so a wrapper which implement FnMut needed here.
res.sort_by_key(|x| (f_clone)(&x));
Box::new(res.into_iter())
});

rdd.partition_by_key(Box::new(part)).map_partitions(sort)
}
}

pub trait Reduce<T> {
Expand Down
2 changes: 1 addition & 1 deletion src/utils/random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub(crate) fn get_default_rng_from_seed(seed: u64) -> Pcg64 {
}

/// Get a new rng with random thread local random seed
fn get_rng_with_random_seed() -> Pcg64 {
pub(crate) fn get_rng_with_random_seed() -> Pcg64 {
Pcg64::seed_from_u64(rand::random::<u64>())
}

Expand Down
106 changes: 50 additions & 56 deletions tests/test_rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,91 +418,85 @@ fn test_zip() {
(5, "1a".to_string()),
];
assert_eq!(res, expected);

}

#[test]
fn test_intersection_with_num_partitions() {
let sc = CONTEXT.clone();

let col1 = vec![
1,
2,
3,
4,
5,
10,
12,
13,
19,
0
];
let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0];

let col2 = vec![
3,
4,
5,
6,
7,
8,
11,
13
];
let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13];

let first = sc.parallelize(col1, 2);
let second = sc.parallelize(col2, 4);
let mut res = first.intersection_with_num_partitions(Arc::new(second), 3).collect().unwrap();
let mut res = first
.intersection_with_num_partitions(Arc::new(second), 3)
.collect()
.unwrap();

res.sort();

let expected = vec![
3,
4,
5,
13
];
let expected = vec![3, 4, 5, 13];
assert_eq!(res, expected);
}

#[test]
fn test_intersection() {
let sc = CONTEXT.clone();

let col1 = vec![
1,
2,
3,
4,
5,
10,
12,
13,
19,
0
];
let col1 = vec![1, 2, 3, 4, 5, 10, 12, 13, 19, 0];

let col2 = vec![
3,
4,
5,
6,
7,
8,
11,
13
];
let col2 = vec![3, 4, 5, 6, 7, 8, 11, 13];

let first = sc.parallelize(col1, 2);
let second = sc.parallelize(col2, 4);
let mut res = first.intersection(Arc::new(second)).collect().unwrap();

res.sort();

let expected = vec![3, 4, 5, 13];
assert_eq!(res, expected);
}

#[test]
fn test_sort_by() {
let sc = CONTEXT.clone();
let col = vec![
"1".to_string(),
"3".to_string(),
"2".to_string(),
"4".to_string(),
"4".to_string(),
"10".to_string(),
"9".to_string(),
"13".to_string(),
"19".to_string(),
"0".to_string(),
];

let rdd = sc.parallelize(col, 2);
let res = rdd
.sort_by(
true,
5,
Fn!(|x: &String| -> u8 { x.parse::<u8>().unwrap() }),
)
.collect()
.unwrap();

let expected = vec![
3,
4,
5,
13
"0".to_string(),
"1".to_string(),
"2".to_string(),
"3".to_string(),
"4".to_string(),
"4".to_string(),
"9".to_string(),
"10".to_string(),
"13".to_string(),
"19".to_string(),
];

assert_eq!(res, expected);
}