Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased] - ReleaseDate

### Added

- APIs on `Portal` to support cursor operations. [#433]

### Changed

- Changed default PID generate use sequential number [#431]

### Fixed

- `Sync` message will no longer clear all portals but unnamed portal. [#433]

## [0.39.0] - 2026-04-20

### Added
Expand Down
14 changes: 3 additions & 11 deletions examples/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,9 @@ async fn handle_fetch(
}

let fetch_result = portal.fetch(count).await?;
println!(
" -> Fetched {} rows, has_more: {}",
fetch_result.rows.len(),
fetch_result.suspended
);

let schema = fetch_result.row_schema;
let row_stream = stream::iter(fetch_result.rows.into_iter().map(Ok));
Ok(vec![Response::Query(QueryResponse::new(
schema, row_stream,
))])
println!(" -> Fetched rows, has_more: {}", fetch_result.suspended);

Ok(vec![Response::Query(fetch_result.response)])
}

fn handle_close(
Expand Down
38 changes: 16 additions & 22 deletions src/api/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use postgres_types::FromSqlOwned;
use tokio::sync::Mutex;

use crate::api::Type;
use crate::api::results::{FieldInfo, QueryResponse};
use crate::api::results::QueryResponse;
use crate::error::{PgWireError, PgWireResult};
use crate::messages::data::{DataRow, FORMAT_CODE_BINARY};
use crate::messages::data::FORMAT_CODE_BINARY;
use crate::messages::extendedquery::Bind;
use crate::types::FromSqlText;
use crate::types::format::FormatOptions;
Expand Down Expand Up @@ -45,9 +45,7 @@ pub enum PortalExecutionState {
/// Result of fetching rows from a portal in `Suspended` state.
#[derive(Debug)]
pub struct FetchResult {
pub command_tag: String,
pub rows: Vec<DataRow>,
pub row_schema: Arc<Vec<FieldInfo>>,
pub response: QueryResponse,
pub suspended: bool,
}

Expand Down Expand Up @@ -211,9 +209,7 @@ impl<S: Clone> Portal<S> {
match state.deref_mut() {
PortalExecutionState::Initial => Err(PgWireError::PortalNotStarted),
PortalExecutionState::Finished => Ok(FetchResult {
command_tag: String::new(),
rows: vec![],
row_schema: Arc::new(vec![]),
response: QueryResponse::new(Arc::new(vec![]), futures::stream::empty()),
suspended: false,
}),
PortalExecutionState::Suspended(response) => {
Expand All @@ -232,22 +228,20 @@ impl<S: Clone> Portal<S> {
}
}

if suspended {
Ok(FetchResult {
command_tag,
rows,
row_schema,
suspended: true,
})
} else {
if !suspended {
*state = PortalExecutionState::Finished;
Ok(FetchResult {
command_tag,
rows,
row_schema,
suspended: false,
})
}

let result_response = QueryResponse {
command_tag,
row_schema,
data_rows: Box::pin(futures::stream::iter(rows.into_iter().map(Ok))),
};

Ok(FetchResult {
response: result_response,
suspended,
})
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,16 +341,19 @@ pub trait ExtendedQueryHandler: Send + Sync {
// Fetch rows from the portal and send to client
if needs_fetch {
let fetch_result = portal.fetch(max_rows).await?;
let row_count = fetch_result.rows.len();
for row in fetch_result.rows {
client.feed(PgWireBackendMessage::DataRow(row)).await?;
let mut response = fetch_result.response;
let command_tag = response.command_tag().to_owned();
let mut row_count = 0;
while let Some(row) = response.data_rows().next().await {
client.feed(PgWireBackendMessage::DataRow(row?)).await?;
row_count += 1;
}
if fetch_result.suspended {
client
.send(PgWireBackendMessage::PortalSuspended(PortalSuspended))
.await?;
} else {
let tag = Tag::new(&fetch_result.command_tag).with_rows(row_count);
let tag = Tag::new(&command_tag).with_rows(row_count);
client
.send(PgWireBackendMessage::CommandComplete(tag.into()))
.await?;
Expand Down
Loading