Skip to content

Commit ac84eb6

Browse files
committed
#1213 make WorkflowHistory stream-based and retain into_events helper
1 parent a7a857f commit ac84eb6

1 file changed

Lines changed: 29 additions & 114 deletions

File tree

crates/client/src/workflow_handle.rs

Lines changed: 29 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
},
99
grpc::WorkflowService,
1010
};
11-
use futures_util::{stream, stream::Stream};
11+
use futures_util::{TryStreamExt, stream, stream::Stream};
1212
use std::{
1313
collections::VecDeque,
1414
fmt::Debug,
@@ -31,10 +31,7 @@ use temporalio_common::{
3131
common::v1::{Payload, Payloads, WorkflowExecution as ProtoWorkflowExecution},
3232
enums::v1::{HistoryEventFilterType, UpdateWorkflowExecutionLifecycleStage},
3333
failure::v1::Failure,
34-
history::{
35-
self,
36-
v1::{HistoryEvent, history_event::Attributes},
37-
},
34+
history::v1::{HistoryEvent, history_event::Attributes},
3835
query::v1::WorkflowQuery,
3936
sdk::v1::UserMetadata,
4037
update::{self, v1::WaitPolicy},
@@ -282,48 +279,26 @@ impl WorkflowExecutionDescription {
282279
}
283280
}
284281

285-
/// Workflow execution history returned by `WorkflowHandle::fetch_history`.
286-
#[derive(Debug, Clone)]
282+
/// Workflow execution history, lazily fetched as a stream from the server.
283+
/// Use `into_events` to collect all events at once, or consume as a `stream`.
287284
pub struct WorkflowHistory {
288-
events: Vec<HistoryEvent>,
289-
}
290-
impl From<WorkflowHistory> for history::v1::History {
291-
fn from(h: WorkflowHistory) -> Self {
292-
Self { events: h.events }
293-
}
294-
}
295-
296-
impl WorkflowHistory {
297-
fn new(events: Vec<HistoryEvent>) -> Self {
298-
Self { events }
299-
}
300-
301-
/// The history events.
302-
pub fn events(&self) -> &[HistoryEvent] {
303-
&self.events
304-
}
305-
306-
/// Consume the history and return the events.
307-
pub fn into_events(self) -> Vec<HistoryEvent> {
308-
self.events
309-
}
310-
}
311-
312-
/// A stream of history events from a workflow execution.
313-
/// Internally paginates through results from the server.
314-
pub struct WorkflowHistoryStream {
315285
inner: Pin<Box<dyn Stream<Item = Result<HistoryEvent, WorkflowInteractionError>> + Send>>,
316286
}
317287

318-
impl WorkflowHistoryStream {
288+
impl WorkflowHistory {
319289
fn new(
320290
inner: Pin<Box<dyn Stream<Item = Result<HistoryEvent, WorkflowInteractionError>> + Send>>,
321291
) -> Self {
322292
Self { inner }
323293
}
294+
295+
/// Consume the stream and collect all events into a Vec
296+
pub async fn into_events(self) -> Result<Vec<HistoryEvent>, WorkflowInteractionError> {
297+
self.inner.try_collect().await
298+
}
324299
}
325300

326-
impl Stream for WorkflowHistoryStream {
301+
impl Stream for WorkflowHistory {
327302
type Item = Result<HistoryEvent, WorkflowInteractionError>;
328303
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
329304
self.inner.as_mut().poll_next(cx)
@@ -489,7 +464,7 @@ where
489464
opts: WorkflowGetResultOptions,
490465
) -> Result<W::Output, WorkflowGetResultError>
491466
where
492-
CT: WorkflowService + NamespacedClient + Clone,
467+
CT: WorkflowService + NamespacedClient + Clone + 'static,
493468
{
494469
let raw = self.get_result_raw(opts).await?;
495470
match raw {
@@ -514,7 +489,7 @@ where
514489
opts: WorkflowGetResultOptions,
515490
) -> Result<WorkflowExecutionResult<W::Output>, WorkflowInteractionError>
516491
where
517-
CT: WorkflowService + NamespacedClient + Clone,
492+
CT: WorkflowService + NamespacedClient + Clone + 'static,
518493
{
519494
let mut run_id = self.info.run_id.clone().unwrap_or_default();
520495
let fetch_opts = WorkflowFetchHistoryOptions::builder()
@@ -524,8 +499,8 @@ where
524499
.build();
525500

526501
loop {
527-
let history = self.fetch_history_for_run(&run_id, &fetch_opts).await?;
528-
let mut events = history.into_events();
502+
let history = self.fetch_history_for_run(&run_id, fetch_opts.clone());
503+
let mut events = history.into_events().await?;
529504

530505
if events.is_empty() {
531506
continue;
@@ -892,96 +867,37 @@ where
892867
.await
893868
.map_err(WorkflowInteractionError::from)
894869
}
895-
/// Fetch workflow execution history.
896-
pub async fn fetch_history(
897-
&self,
898-
opts: WorkflowFetchHistoryOptions,
899-
) -> Result<WorkflowHistory, WorkflowInteractionError>
900-
where
901-
CT: NamespacedClient,
902-
{
903-
let run_id = self.info.run_id.clone().unwrap_or_default();
904-
self.fetch_history_for_run(&run_id, &opts).await
905-
}
906870

907871
/// Fetch workflow execution history as a lazy stream
908-
pub fn fetch_history_stream(
872+
pub fn fetch_history(
909873
&self,
910874
opts: WorkflowFetchHistoryOptions,
911-
) -> WorkflowHistoryStream
875+
) -> WorkflowHistory
912876
where
913-
CT: WorkflowService + NamespacedClient + 'static,
877+
CT: NamespacedClient + 'static,
914878
{
915879
let run_id = self.info.run_id.clone().unwrap_or_default();
916-
self.fetch_history_stream_for_run(&run_id, &opts)
880+
self.fetch_history_for_run(&run_id, opts)
917881
}
918882

919-
/// Fetch history for a specific run_id, handling pagination.
920-
async fn fetch_history_for_run(
883+
fn fetch_history_for_run(
921884
&self,
922885
run_id: &str,
923-
opts: &WorkflowFetchHistoryOptions,
924-
) -> Result<WorkflowHistory, WorkflowInteractionError>
925-
where
926-
CT: NamespacedClient,
927-
{
928-
let mut all_events = Vec::new();
929-
let mut next_page_token = vec![];
930-
931-
loop {
932-
let response = WorkflowService::get_workflow_execution_history(
933-
&mut self.client.clone(),
934-
GetWorkflowExecutionHistoryRequest {
935-
namespace: self.client.namespace(),
936-
execution: Some(ProtoWorkflowExecution {
937-
workflow_id: self.info.workflow_id.clone(),
938-
run_id: run_id.to_string(),
939-
}),
940-
next_page_token: next_page_token.clone(),
941-
skip_archival: opts.skip_archival,
942-
wait_new_event: opts.wait_new_event,
943-
history_event_filter_type: opts.event_filter_type as i32,
944-
..Default::default()
945-
}
946-
.into_request(),
947-
)
948-
.await
949-
.map_err(WorkflowInteractionError::from_status)?
950-
.into_inner();
951-
952-
if let Some(history) = response.history {
953-
all_events.extend(history.events);
954-
}
955-
956-
if response.next_page_token.is_empty() {
957-
break;
958-
}
959-
next_page_token = response.next_page_token;
960-
}
961-
962-
Ok(WorkflowHistory::new(all_events))
963-
}
964-
965-
/// Fetch history for a specific run_id, handling pagination as a lazy stream
966-
fn fetch_history_stream_for_run(
967-
&self,
968-
run_id: &str,
969-
opts: &WorkflowFetchHistoryOptions,
970-
) -> WorkflowHistoryStream
886+
opts: WorkflowFetchHistoryOptions,
887+
) -> WorkflowHistory
971888
where
972-
CT: WorkflowService + NamespacedClient + 'static,
889+
CT: NamespacedClient + 'static,
973890
{
974891
let client = self.client.clone();
975892
let workflow_id = self.info.workflow_id.clone();
976893
let run_id = run_id.to_string();
977-
let opts = opts.clone();
978894

979895
// State: (next_page_token, buffer, yielded_count, exhausted)
980-
let initial_state = (Vec::new(), VecDeque::new(), 0, false);
896+
let initial_state = (Vec::new(), VecDeque::new(), false);
981897

982898
let stream = stream::unfold(
983899
initial_state,
984-
move |(next_page_token, mut buffer, mut yielded, exhausted)| {
900+
move |(next_page_token, mut buffer, exhausted)| {
985901
let mut client = client.clone();
986902
let namespace = client.namespace();
987903
let workflow_id = workflow_id.clone();
@@ -990,8 +906,7 @@ where
990906

991907
async move {
992908
if let Some(exec) = buffer.pop_front() {
993-
yielded += 1;
994-
return Some((Ok(exec), (next_page_token, buffer, yielded, exhausted)));
909+
return Some((Ok(exec), (next_page_token, buffer, exhausted)));
995910
}
996911

997912
if exhausted {
@@ -1027,20 +942,20 @@ where
1027942
}
1028943

1029944
if let Some(event) = buffer.pop_front() {
1030-
Some((Ok(event), (new_token, buffer, yielded, new_exhausted)))
945+
Some((Ok(event), (new_token, buffer, new_exhausted)))
1031946
} else {
1032947
None
1033948
}
1034949
}
1035950
Err(e) => Some((
1036951
Err(WorkflowInteractionError::from_status(e)),
1037-
(next_page_token, buffer, yielded, true),
952+
(next_page_token, buffer, true),
1038953
)),
1039954
}
1040955
}
1041956
},
1042957
);
1043-
WorkflowHistoryStream::new(Box::pin(stream))
958+
WorkflowHistory::new(Box::pin(stream))
1044959
}
1045960
}
1046961

0 commit comments

Comments
 (0)