|
| 1 | +use std::{collections::HashMap, path::PathBuf, sync::Arc}; |
| 2 | + |
| 3 | +use datafusion::{common::Column, prelude::Expr}; |
| 4 | +use itertools::Itertools; |
| 5 | +use relative_path::RelativePathBuf; |
| 6 | + |
| 7 | +use crate::query::stream_schema_provider::extract_primary_filter; |
| 8 | +use crate::{ |
| 9 | + catalog::{ |
| 10 | + manifest::{File, Manifest}, |
| 11 | + snapshot, Snapshot, |
| 12 | + }, |
| 13 | + event, |
| 14 | + parseable::PARSEABLE, |
| 15 | + query::{stream_schema_provider::ManifestExt, PartialTimeFilter}, |
| 16 | + storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, |
| 17 | + utils::time::TimeRange, |
| 18 | +}; |
| 19 | + |
| 20 | +pub fn create_time_filter( |
| 21 | + time_range: &TimeRange, |
| 22 | + time_partition: Option<String>, |
| 23 | + table_name: &str, |
| 24 | +) -> Vec<Expr> { |
| 25 | + let mut new_filters = vec![]; |
| 26 | + let start_time = time_range.start.naive_utc(); |
| 27 | + let end_time = time_range.end.naive_utc(); |
| 28 | + let mut _start_time_filter: Expr; |
| 29 | + let mut _end_time_filter: Expr; |
| 30 | + |
| 31 | + match time_partition { |
| 32 | + Some(time_partition) => { |
| 33 | + _start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) |
| 34 | + .binary_expr(Expr::Column(Column::new( |
| 35 | + Some(table_name.to_owned()), |
| 36 | + time_partition.clone(), |
| 37 | + ))); |
| 38 | + _end_time_filter = |
| 39 | + PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr( |
| 40 | + Expr::Column(Column::new(Some(table_name.to_owned()), time_partition)), |
| 41 | + ); |
| 42 | + } |
| 43 | + None => { |
| 44 | + _start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) |
| 45 | + .binary_expr(Expr::Column(Column::new( |
| 46 | + Some(table_name.to_owned()), |
| 47 | + event::DEFAULT_TIMESTAMP_KEY, |
| 48 | + ))); |
| 49 | + _end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) |
| 50 | + .binary_expr(Expr::Column(Column::new( |
| 51 | + Some(table_name.to_owned()), |
| 52 | + event::DEFAULT_TIMESTAMP_KEY, |
| 53 | + ))); |
| 54 | + } |
| 55 | + } |
| 56 | + |
| 57 | + new_filters.push(_start_time_filter); |
| 58 | + new_filters.push(_end_time_filter); |
| 59 | + |
| 60 | + new_filters |
| 61 | +} |
| 62 | + |
| 63 | +pub async fn fetch_parquet_file_paths( |
| 64 | + stream: &str, |
| 65 | + time_range: &TimeRange, |
| 66 | +) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> { |
| 67 | + let glob_storage = PARSEABLE.storage.get_object_store(); |
| 68 | + |
| 69 | + let object_store_format = glob_storage.get_object_store_format(stream).await?; |
| 70 | + |
| 71 | + let time_partition = object_store_format.time_partition; |
| 72 | + |
| 73 | + let time_filter_expr = create_time_filter(time_range, time_partition.clone(), stream); |
| 74 | + |
| 75 | + let time_filters = extract_primary_filter(&time_filter_expr, &time_partition); |
| 76 | + |
| 77 | + let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default(); |
| 78 | + |
| 79 | + let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]); |
| 80 | + let obs = glob_storage |
| 81 | + .get_objects( |
| 82 | + Some(&path), |
| 83 | + Box::new(|file_name| file_name.ends_with("stream.json")), |
| 84 | + ) |
| 85 | + .await; |
| 86 | + if let Ok(obs) = obs { |
| 87 | + for ob in obs { |
| 88 | + if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) { |
| 89 | + let snapshot = object_store_format.snapshot; |
| 90 | + for manifest in snapshot.manifest_list { |
| 91 | + merged_snapshot.manifest_list.push(manifest); |
| 92 | + } |
| 93 | + } |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + let manifest_files = collect_manifest_files( |
| 98 | + glob_storage, |
| 99 | + merged_snapshot |
| 100 | + .manifests(&time_filters) |
| 101 | + .into_iter() |
| 102 | + .sorted_by_key(|file| file.time_lower_bound) |
| 103 | + .map(|item| item.manifest_path) |
| 104 | + .collect(), |
| 105 | + ) |
| 106 | + .await?; |
| 107 | + |
| 108 | + let mut parquet_files: HashMap<RelativePathBuf, Vec<File>> = HashMap::new(); |
| 109 | + |
| 110 | + let mut selected_files = manifest_files |
| 111 | + .into_iter() |
| 112 | + .flat_map(|file| file.files) |
| 113 | + .rev() |
| 114 | + .collect_vec(); |
| 115 | + |
| 116 | + for filter in time_filter_expr { |
| 117 | + selected_files.retain(|file| !file.can_be_pruned(&filter)) |
| 118 | + } |
| 119 | + |
| 120 | + selected_files |
| 121 | + .into_iter() |
| 122 | + .map(|file| { |
| 123 | + let date = file.file_path.split("/").collect_vec(); |
| 124 | + |
| 125 | + let date = date.as_slice()[1..4].iter().map(|s| s.to_string()); |
| 126 | + |
| 127 | + let date = RelativePathBuf::from_iter(date); |
| 128 | + |
| 129 | + parquet_files.entry(date).or_default().push(file); |
| 130 | + }) |
| 131 | + .for_each(|_| {}); |
| 132 | + |
| 133 | + Ok(parquet_files) |
| 134 | +} |
| 135 | + |
| 136 | +async fn collect_manifest_files( |
| 137 | + storage: Arc<dyn ObjectStorage>, |
| 138 | + manifest_urls: Vec<String>, |
| 139 | +) -> Result<Vec<Manifest>, ObjectStorageError> { |
| 140 | + let mut tasks = Vec::new(); |
| 141 | + manifest_urls.into_iter().for_each(|path| { |
| 142 | + let path = RelativePathBuf::from_path(PathBuf::from(path)).expect("Invalid path"); |
| 143 | + let storage = Arc::clone(&storage); |
| 144 | + tasks.push(tokio::task::spawn(async move { |
| 145 | + storage.get_object(&path).await |
| 146 | + })); |
| 147 | + }); |
| 148 | + |
| 149 | + let mut op = Vec::new(); |
| 150 | + for task in tasks { |
| 151 | + let file = task.await??; |
| 152 | + op.push(file); |
| 153 | + } |
| 154 | + |
| 155 | + Ok(op |
| 156 | + .into_iter() |
| 157 | + .map(|res| serde_json::from_slice(&res).expect("Data is invalid for Manifest")) |
| 158 | + .collect()) |
| 159 | +} |
0 commit comments