Skip to content

Commit c431b3a

Browse files
Feat/sql pagination 2 (#157)
(#141) was becoming hard for me to reason about because FlightSQL pagination has to be handled differently than SQL tab pagination (because on the SQL tab we can control batch size directly on the context but we cant do that with FlightSQL). So I would like to split pagination on each of those tabs into their own PRs to keep them more focused and easier to review / reason about / etc. So this PR makes pagination work and adds integration tests for testing pagination. There is still room for improvement in the end to end testing (right now we test some of the implementation details) but at least we have some coverage.
1 parent 9ff201c commit c431b3a

File tree

17 files changed

+844
-328
lines changed

17 files changed

+844
-328
lines changed

src/app/app_execution.rs

+86-26
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,42 @@
1818
//! [`AppExecution`]: Handles executing queries for the TUI application.
1919
2020
use crate::app::state::tabs::sql::Query;
21-
use crate::app::AppEvent;
21+
use crate::app::{AppEvent, ExecutionError, ExecutionResultsBatch};
2222
use crate::execution::ExecutionContext;
2323
use color_eyre::eyre::Result;
24+
use datafusion::execution::context::SessionContext;
25+
use datafusion::execution::SendableRecordBatchStream;
26+
use datafusion::physical_plan::execute_stream;
2427
use futures::StreamExt;
2528
use log::{error, info};
2629
use std::sync::Arc;
2730
use std::time::Duration;
2831
use tokio::sync::mpsc::UnboundedSender;
32+
use tokio::sync::Mutex;
2933

3034
/// Handles executing queries for the TUI application, formatting results
3135
/// and sending them to the UI.
32-
pub(crate) struct AppExecution {
36+
pub struct AppExecution {
3337
inner: Arc<ExecutionContext>,
38+
result_stream: Arc<Mutex<Option<SendableRecordBatchStream>>>,
3439
}
3540

3641
impl AppExecution {
3742
/// Create a new instance of [`AppExecution`].
3843
pub fn new(inner: Arc<ExecutionContext>) -> Self {
39-
Self { inner }
44+
Self {
45+
inner,
46+
result_stream: Arc::new(Mutex::new(None)),
47+
}
48+
}
49+
50+
pub fn session_ctx(&self) -> &SessionContext {
51+
self.inner.session_ctx()
52+
}
53+
54+
pub async fn set_result_stream(&self, stream: SendableRecordBatchStream) {
55+
let mut s = self.result_stream.lock().await;
56+
*s = Some(stream)
4057
}
4158

4259
/// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender.
@@ -60,33 +77,53 @@ impl AppExecution {
6077
let start = std::time::Instant::now();
6178
if i == statement_count - 1 {
6279
info!("Executing last query and display results");
63-
match self.inner.execute_sql(sql).await {
64-
Ok(mut stream) => {
65-
let mut batches = Vec::new();
66-
while let Some(maybe_batch) = stream.next().await {
67-
match maybe_batch {
68-
Ok(batch) => {
69-
batches.push(batch);
70-
}
71-
Err(e) => {
72-
let elapsed = start.elapsed();
73-
query.set_error(Some(e.to_string()));
74-
query.set_execution_time(elapsed);
75-
break;
80+
sender.send(AppEvent::NewExecution)?;
81+
match self.inner.create_physical_plan(sql).await {
82+
Ok(plan) => match execute_stream(plan, self.inner.session_ctx().task_ctx()) {
83+
Ok(stream) => {
84+
self.set_result_stream(stream).await;
85+
let mut stream = self.result_stream.lock().await;
86+
if let Some(s) = stream.as_mut() {
87+
if let Some(b) = s.next().await {
88+
match b {
89+
Ok(b) => {
90+
let duration = start.elapsed();
91+
let results = ExecutionResultsBatch {
92+
query: sql.to_string(),
93+
batch: b,
94+
duration,
95+
};
96+
sender.send(AppEvent::ExecutionResultsNextPage(
97+
results,
98+
))?;
99+
}
100+
Err(e) => {
101+
error!("Error getting RecordBatch: {:?}", e);
102+
}
103+
}
76104
}
77105
}
78106
}
107+
Err(stream_err) => {
108+
error!("Error creating physical plan: {:?}", stream_err);
109+
let elapsed = start.elapsed();
110+
let e = ExecutionError {
111+
query: sql.to_string(),
112+
error: stream_err.to_string(),
113+
duration: elapsed,
114+
};
115+
sender.send(AppEvent::ExecutionResultsError(e))?;
116+
}
117+
},
118+
Err(plan_err) => {
119+
error!("Error creating physical plan: {:?}", plan_err);
79120
let elapsed = start.elapsed();
80-
let rows: usize = batches.iter().map(|r| r.num_rows()).sum();
81-
query.set_results(Some(batches));
82-
query.set_num_rows(Some(rows));
83-
query.set_execution_time(elapsed);
84-
}
85-
Err(e) => {
86-
error!("Error creating dataframe: {:?}", e);
87-
let elapsed = start.elapsed();
88-
query.set_error(Some(e.to_string()));
89-
query.set_execution_time(elapsed);
121+
let e = ExecutionError {
122+
query: sql.to_string(),
123+
error: plan_err.to_string(),
124+
duration: elapsed,
125+
};
126+
sender.send(AppEvent::ExecutionResultsError(e))?;
90127
}
91128
}
92129
} else {
@@ -107,4 +144,27 @@ impl AppExecution {
107144
}
108145
Ok(())
109146
}
147+
148+
pub async fn next_batch(&self, sql: String, sender: UnboundedSender<AppEvent>) {
149+
let mut stream = self.result_stream.lock().await;
150+
if let Some(s) = stream.as_mut() {
151+
let start = std::time::Instant::now();
152+
if let Some(b) = s.next().await {
153+
match b {
154+
Ok(b) => {
155+
let duration = start.elapsed();
156+
let results = ExecutionResultsBatch {
157+
query: sql,
158+
batch: b,
159+
duration,
160+
};
161+
let _ = sender.send(AppEvent::ExecutionResultsNextPage(results));
162+
}
163+
Err(e) => {
164+
error!("Error getting RecordBatch: {:?}", e);
165+
}
166+
}
167+
}
168+
}
169+
}
110170
}

src/app/handlers/flightsql.rs

+68-68
Original file line numberDiff line numberDiff line change
@@ -66,74 +66,74 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) {
6666
}
6767
}
6868

69-
KeyCode::Enter => {
70-
info!("Run FS query");
71-
let sql = app.state.flightsql_tab.editor().lines().join("");
72-
info!("SQL: {}", sql);
73-
let execution = Arc::clone(&app.execution);
74-
let _event_tx = app.event_tx();
75-
tokio::spawn(async move {
76-
let client = execution.flightsql_client();
77-
let mut query =
78-
FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
79-
let start = Instant::now();
80-
if let Some(ref mut c) = *client.lock().await {
81-
info!("Sending query");
82-
match c.execute(sql, None).await {
83-
Ok(flight_info) => {
84-
for endpoint in flight_info.endpoint {
85-
if let Some(ticket) = endpoint.ticket {
86-
match c.do_get(ticket.into_request()).await {
87-
Ok(mut stream) => {
88-
let mut batches: Vec<RecordBatch> = Vec::new();
89-
// temporarily only show the first batch to avoid
90-
// buffering massive result sets. Eventually there should
91-
// be some sort of paging logic
92-
// see https://github.com/datafusion-contrib/datafusion-tui/pull/133#discussion_r1756680874
93-
// while let Some(maybe_batch) = stream.next().await {
94-
if let Some(maybe_batch) = stream.next().await {
95-
match maybe_batch {
96-
Ok(batch) => {
97-
info!("Batch rows: {}", batch.num_rows());
98-
batches.push(batch);
99-
}
100-
Err(e) => {
101-
error!("Error getting batch: {:?}", e);
102-
let elapsed = start.elapsed();
103-
query.set_error(Some(e.to_string()));
104-
query.set_execution_time(elapsed);
105-
}
106-
}
107-
}
108-
let elapsed = start.elapsed();
109-
let rows: usize =
110-
batches.iter().map(|r| r.num_rows()).sum();
111-
query.set_results(Some(batches));
112-
query.set_num_rows(Some(rows));
113-
query.set_execution_time(elapsed);
114-
}
115-
Err(e) => {
116-
error!("Error getting response: {:?}", e);
117-
let elapsed = start.elapsed();
118-
query.set_error(Some(e.to_string()));
119-
query.set_execution_time(elapsed);
120-
}
121-
}
122-
}
123-
}
124-
}
125-
Err(e) => {
126-
error!("Error getting response: {:?}", e);
127-
let elapsed = start.elapsed();
128-
query.set_error(Some(e.to_string()));
129-
query.set_execution_time(elapsed);
130-
}
131-
}
132-
}
133-
134-
let _ = _event_tx.send(AppEvent::FlightSQLQueryResult(query));
135-
});
136-
}
69+
// KeyCode::Enter => {
70+
// info!("Run FS query");
71+
// let sql = app.state.flightsql_tab.editor().lines().join("");
72+
// info!("SQL: {}", sql);
73+
// let execution = Arc::clone(&app.execution);
74+
// let _event_tx = app.event_tx();
75+
// tokio::spawn(async move {
76+
// let client = execution.flightsql_client();
77+
// let mut query =
78+
// FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
79+
// let start = Instant::now();
80+
// if let Some(ref mut c) = *client.lock().await {
81+
// info!("Sending query");
82+
// match c.execute(sql, None).await {
83+
// Ok(flight_info) => {
84+
// for endpoint in flight_info.endpoint {
85+
// if let Some(ticket) = endpoint.ticket {
86+
// match c.do_get(ticket.into_request()).await {
87+
// Ok(mut stream) => {
88+
// let mut batches: Vec<RecordBatch> = Vec::new();
89+
// // temporarily only show the first batch to avoid
90+
// // buffering massive result sets. Eventually there should
91+
// // be some sort of paging logic
92+
// // see https://github.com/datafusion-contrib/datafusion-tui/pull/133#discussion_r1756680874
93+
// // while let Some(maybe_batch) = stream.next().await {
94+
// if let Some(maybe_batch) = stream.next().await {
95+
// match maybe_batch {
96+
// Ok(batch) => {
97+
// info!("Batch rows: {}", batch.num_rows());
98+
// batches.push(batch);
99+
// }
100+
// Err(e) => {
101+
// error!("Error getting batch: {:?}", e);
102+
// let elapsed = start.elapsed();
103+
// query.set_error(Some(e.to_string()));
104+
// query.set_execution_time(elapsed);
105+
// }
106+
// }
107+
// }
108+
// let elapsed = start.elapsed();
109+
// let rows: usize =
110+
// batches.iter().map(|r| r.num_rows()).sum();
111+
// query.set_results(Some(batches));
112+
// query.set_num_rows(Some(rows));
113+
// query.set_execution_time(elapsed);
114+
// }
115+
// Err(e) => {
116+
// error!("Error getting response: {:?}", e);
117+
// let elapsed = start.elapsed();
118+
// query.set_error(Some(e.to_string()));
119+
// query.set_execution_time(elapsed);
120+
// }
121+
// }
122+
// }
123+
// }
124+
// }
125+
// Err(e) => {
126+
// error!("Error getting response: {:?}", e);
127+
// let elapsed = start.elapsed();
128+
// query.set_error(Some(e.to_string()));
129+
// query.set_execution_time(elapsed);
130+
// }
131+
// }
132+
// }
133+
//
134+
// let _ = _event_tx.send(AppEvent::FlightSQLQueryResult(query));
135+
// });
136+
// }
137137
_ => {}
138138
}
139139
}

0 commit comments

Comments
 (0)