-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathdepartment_index.rs
More file actions
116 lines (99 loc) · 3.43 KB
/
department_index.rs
File metadata and controls
116 lines (99 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use datafusion::arrow::array::{
Array, ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array,
};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::datatypes::Field;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::{Expr, Operator};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::Statistics;
use datafusion::scalar::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_index_provider::physical_plan::create_index_schema;
use datafusion_index_provider::physical_plan::Index;
#[derive(Debug)]
pub struct DepartmentIndex {
index: BTreeMap<String, Vec<i32>>,
}
impl DepartmentIndex {
pub fn new(departments: &StringArray, ids: &Int32Array) -> Self {
let mut index: BTreeMap<String, Vec<i32>> = BTreeMap::new();
for i in 0..departments.len() {
let department = departments.value(i).to_string();
let row_id = ids.value(i);
index.entry(department).or_default().push(row_id);
}
DepartmentIndex { index }
}
pub fn create_data_from_filters(
&self,
filters: &[Expr],
limit: Option<usize>,
) -> Vec<RecordBatch> {
let mut row_ids = BTreeSet::new();
for filter in filters {
if let Expr::BinaryExpr(be) = filter {
if let Expr::Column(c) = be.left.as_ref() {
if c.name == "department" {
if let Expr::Literal(ScalarValue::Utf8(Some(v)), _) = be.right.as_ref() {
if be.op == Operator::Eq {
if let Some(ids) = self.index.get(v) {
for id in ids {
row_ids.insert(*id);
}
}
}
}
}
}
}
}
let mut final_row_ids: Vec<u64> = row_ids.into_iter().map(|id| id as u64).collect();
if let Some(l) = limit {
final_row_ids.truncate(l);
}
if final_row_ids.is_empty() {
return vec![];
}
let schema = self.index_schema();
let column = Arc::new(UInt64Array::from(final_row_ids)) as ArrayRef;
let batch = RecordBatch::try_new(schema, vec![column]).unwrap();
vec![batch]
}
}
impl Index for DepartmentIndex {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"department_index"
}
fn index_schema(&self) -> SchemaRef {
create_index_schema([Field::new("id", DataType::UInt64, false)])
}
fn table_name(&self) -> &str {
"employees"
}
fn column_name(&self) -> &str {
"department"
}
fn scan(
&self,
filters: &[Expr],
limit: Option<usize>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
let data = self.create_data_from_filters(filters, limit);
log::debug!("Department index data: {data:?}");
Ok(Box::pin(MemoryStream::try_new(
data,
self.index_schema(),
None,
)?))
}
fn statistics(&self) -> Statistics {
Statistics::new_unknown(self.index_schema().as_ref())
}
}