Skip to content

Commit bb8da43

Browse files
authored
refactor: update fetch to return queryresponse (#434)
* chore: housekeeping changelog * update fetch to return response
1 parent d946da4 commit bb8da43

4 files changed

Lines changed: 40 additions & 37 deletions

File tree

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic
77
Versioning](https://semver.org/spec/v2.0.0.html).
88

9+
## [Unreleased] - ReleaseDate
10+
11+
### Added
12+
13+
- APIs on `Portal` to support cursor operations. [#433]
14+
15+
### Changed
16+
17+
- Changed default PID generate use sequential number [#431]
18+
19+
### Fixed
20+
21+
- `Sync` message will no longer clear all portals but unnamed portal. [#433]
22+
923
## [0.39.0] - 2026-04-20
1024

1125
### Added

examples/cursor.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -231,17 +231,9 @@ async fn handle_fetch(
231231
}
232232

233233
let fetch_result = portal.fetch(count).await?;
234-
println!(
235-
" -> Fetched {} rows, has_more: {}",
236-
fetch_result.rows.len(),
237-
fetch_result.suspended
238-
);
239-
240-
let schema = fetch_result.row_schema;
241-
let row_stream = stream::iter(fetch_result.rows.into_iter().map(Ok));
242-
Ok(vec![Response::Query(QueryResponse::new(
243-
schema, row_stream,
244-
))])
234+
println!(" -> Fetched rows, has_more: {}", fetch_result.suspended);
235+
236+
Ok(vec![Response::Query(fetch_result.response)])
245237
}
246238

247239
fn handle_close(

src/api/portal.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use postgres_types::FromSqlOwned;
88
use tokio::sync::Mutex;
99

1010
use crate::api::Type;
11-
use crate::api::results::{FieldInfo, QueryResponse};
11+
use crate::api::results::QueryResponse;
1212
use crate::error::{PgWireError, PgWireResult};
13-
use crate::messages::data::{DataRow, FORMAT_CODE_BINARY};
13+
use crate::messages::data::FORMAT_CODE_BINARY;
1414
use crate::messages::extendedquery::Bind;
1515
use crate::types::FromSqlText;
1616
use crate::types::format::FormatOptions;
@@ -45,9 +45,7 @@ pub enum PortalExecutionState {
4545
/// Result of fetching rows from a portal in `Suspended` state.
4646
#[derive(Debug)]
4747
pub struct FetchResult {
48-
pub command_tag: String,
49-
pub rows: Vec<DataRow>,
50-
pub row_schema: Arc<Vec<FieldInfo>>,
48+
pub response: QueryResponse,
5149
pub suspended: bool,
5250
}
5351

@@ -211,9 +209,7 @@ impl<S: Clone> Portal<S> {
211209
match state.deref_mut() {
212210
PortalExecutionState::Initial => Err(PgWireError::PortalNotStarted),
213211
PortalExecutionState::Finished => Ok(FetchResult {
214-
command_tag: String::new(),
215-
rows: vec![],
216-
row_schema: Arc::new(vec![]),
212+
response: QueryResponse::new(Arc::new(vec![]), futures::stream::empty()),
217213
suspended: false,
218214
}),
219215
PortalExecutionState::Suspended(response) => {
@@ -232,22 +228,20 @@ impl<S: Clone> Portal<S> {
232228
}
233229
}
234230

235-
if suspended {
236-
Ok(FetchResult {
237-
command_tag,
238-
rows,
239-
row_schema,
240-
suspended: true,
241-
})
242-
} else {
231+
if !suspended {
243232
*state = PortalExecutionState::Finished;
244-
Ok(FetchResult {
245-
command_tag,
246-
rows,
247-
row_schema,
248-
suspended: false,
249-
})
250233
}
234+
235+
let result_response = QueryResponse {
236+
command_tag,
237+
row_schema,
238+
data_rows: Box::pin(futures::stream::iter(rows.into_iter().map(Ok))),
239+
};
240+
241+
Ok(FetchResult {
242+
response: result_response,
243+
suspended,
244+
})
251245
}
252246
}
253247
}

src/api/query.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -341,16 +341,19 @@ pub trait ExtendedQueryHandler: Send + Sync {
341341
// Fetch rows from the portal and send to client
342342
if needs_fetch {
343343
let fetch_result = portal.fetch(max_rows).await?;
344-
let row_count = fetch_result.rows.len();
345-
for row in fetch_result.rows {
346-
client.feed(PgWireBackendMessage::DataRow(row)).await?;
344+
let mut response = fetch_result.response;
345+
let command_tag = response.command_tag().to_owned();
346+
let mut row_count = 0;
347+
while let Some(row) = response.data_rows().next().await {
348+
client.feed(PgWireBackendMessage::DataRow(row?)).await?;
349+
row_count += 1;
347350
}
348351
if fetch_result.suspended {
349352
client
350353
.send(PgWireBackendMessage::PortalSuspended(PortalSuspended))
351354
.await?;
352355
} else {
353-
let tag = Tag::new(&fetch_result.command_tag).with_rows(row_count);
356+
let tag = Tag::new(&command_tag).with_rows(row_count);
354357
client
355358
.send(PgWireBackendMessage::CommandComplete(tag.into()))
356359
.await?;

0 commit comments

Comments
 (0)