-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcomposite_pk_fetcher.rs
More file actions
87 lines (76 loc) · 2.34 KB
/
composite_pk_fetcher.rs
File metadata and controls
87 lines (76 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use std::fmt;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::arrow::array::{Array, Int32Array, StringArray, UInt64Array};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Result;
use datafusion_index_provider::physical_plan::fetcher::RecordFetcher;
pub struct CompositePkFetcher {
data: RecordBatch,
}
impl CompositePkFetcher {
pub fn new(data: RecordBatch) -> Self {
Self { data }
}
}
impl fmt::Debug for CompositePkFetcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "CompositePkFetcher")
}
}
#[async_trait]
impl RecordFetcher for CompositePkFetcher {
fn schema(&self) -> SchemaRef {
self.data.schema()
}
async fn fetch(&self, index_batch: RecordBatch) -> Result<RecordBatch> {
let req_tenants = index_batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let req_eids = index_batch
.column(1)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let data_tenants = self
.data
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let data_eids = self
.data
.column(1)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let mut indices = Vec::new();
for i in 0..req_tenants.len() {
let req_t = req_tenants.value(i);
let req_e = req_eids.value(i);
for j in 0..data_tenants.len() {
if data_tenants.value(j) == req_t && data_eids.value(j) == req_e {
indices.push(j as i32);
break;
}
}
}
let indices_array = Int32Array::from(indices);
let new_columns: Result<Vec<Arc<dyn Array>>> = self
.data
.columns()
.iter()
.map(|col| {
Ok(Arc::new(datafusion::arrow::compute::take(
col.as_ref(),
&indices_array,
None,
)?) as Arc<dyn Array>)
})
.collect();
Ok(RecordBatch::try_new(self.data.schema(), new_columns?)?)
}
}