@@ -61,6 +61,10 @@ pub enum PipelineState {
6161 /// The pipeline has been created with a dataset, source, and target but has
6262 /// not yet started processing.
6363 NotStarted ,
64+ /// The pipeline has been initialized: the first batch for every table has
65+ /// been ETL'd into the target so the system adapter can discover initial
66+ /// data.
67+ Initialized ,
6468 /// The pipeline is actively rehydrating batches (in order of batch ID) from
6569 /// the configured [`Source`] into the configured [`Target`].
6670 Running ,
@@ -88,9 +92,12 @@ pub enum StopReason {
8892/// 1. **[`NotStarted`](PipelineState::NotStarted)** — created via [`ETLPipeline::new`]
8993/// with a dataset, source, and target. Call [`setup_request_datasets`](ETLPipeline::setup_request_datasets)
9094/// to obtain the dataset configurations that a system adapter needs.
91- /// 2. **[`Running`](PipelineState::Running)** — the pipeline is actively processing
92- /// batches.
93- /// 3. **[`Stopped`](PipelineState::Stopped)** — the pipeline finished, was cancelled,
95+ /// 2. **[`Initialized`](PipelineState::Initialized)** — the first batch (batch 0)
96+ /// has been ETL'd into the target via [`initialize`](ETLPipeline::initialize).
97+ /// The system adapter can now discover initial data.
98+ /// 3. **[`Running`](PipelineState::Running)** — the pipeline is actively processing
99+ /// remaining batches (batch 1+).
100+ /// 4. **[`Stopped`](PipelineState::Stopped)** — the pipeline finished, was cancelled,
94101/// or hit an error.
95102pub struct ETLPipeline {
96103 dataset_source : DatasetSource ,
@@ -185,11 +192,93 @@ impl ETLPipeline {
185192 . collect ( )
186193 }
187194
188- /// Starts the ETL pipeline, transitioning from [`PipelineState::NotStarted`]
195+ /// Initializes the ETL pipeline by processing only the first batch (batch
196+ /// ID 0) for every table.
197+ ///
198+ /// This ensures the target has some initial data before calling
199+ /// `setup()` on the system adapter. After successful initialization the
200+ /// pipeline transitions to [`PipelineState::Initialized`].
201+ ///
202+ /// Returns an error if the pipeline is not in the [`NotStarted`] state or
203+ /// if any batch fails to process.
204+ pub async fn initialize ( & mut self ) -> anyhow:: Result < ( ) > {
205+ if * self . state_rx . borrow ( ) != PipelineState :: NotStarted {
206+ anyhow:: bail!(
207+ "Cannot initialize pipeline: current state is {:?}" ,
208+ * self . state_rx. borrow( )
209+ ) ;
210+ }
211+
212+ let tables = self . dataset . tables ( ) ;
213+ let first_batch_id = 0u64 ;
214+
215+ let mut join_set: JoinSet < Result < String , String > > = JoinSet :: new ( ) ;
216+ for table_name in tables. keys ( ) {
217+ let dataset = Arc :: clone ( & self . dataset ) ;
218+ let source = Arc :: clone ( & self . source ) ;
219+ let target = Arc :: clone ( & self . target ) ;
220+ let table_name = table_name. clone ( ) ;
221+
222+ join_set. spawn ( async move {
223+ let read_result = source
224+ . read_batch ( & table_name, first_batch_id)
225+ . await
226+ . map_err ( |e| format ! ( "read {table_name} batch {first_batch_id}: {e}" ) ) ?
227+ . ok_or_else ( || {
228+ format ! ( "No data for table {table_name} at batch {first_batch_id}" )
229+ } ) ?;
230+
231+ for batch in read_result. batches {
232+ let rehydrated = dataset. rehydrate ( & table_name, & batch) . map_err ( |e| {
233+ format ! ( "rehydrate {table_name} batch {first_batch_id}: {e}" )
234+ } ) ?;
235+
236+ target
237+ . write ( & table_name, first_batch_id, rehydrated)
238+ . await
239+ . map_err ( |e| format ! ( "write {table_name} batch {first_batch_id}: {e}" ) ) ?;
240+ }
241+
242+ info ! (
243+ table = %table_name,
244+ batch_id = first_batch_id,
245+ "Initial batch processed"
246+ ) ;
247+ Ok ( table_name)
248+ } ) ;
249+ }
250+
251+ while let Some ( result) = join_set. join_next ( ) . await {
252+ match result {
253+ Ok ( Ok ( _table_name) ) => { }
254+ Ok ( Err ( err_msg) ) => {
255+ let _ = self
256+ . state_tx
257+ . send ( PipelineState :: Stopped ( StopReason :: Error ( err_msg. clone ( ) ) ) ) ;
258+ anyhow:: bail!( "ETL initialization failed: {err_msg}" ) ;
259+ }
260+ Err ( e) => {
261+ let msg = format ! ( "Task panicked during initialization: {e}" ) ;
262+ let _ = self
263+ . state_tx
264+ . send ( PipelineState :: Stopped ( StopReason :: Error ( msg. clone ( ) ) ) ) ;
265+ anyhow:: bail!( "{msg}" ) ;
266+ }
267+ }
268+ }
269+
270+ info ! ( "ETL pipeline initialized with first batch for all tables" ) ;
271+ let _ = self . state_tx . send ( PipelineState :: Initialized ) ;
272+ Ok ( ( ) )
273+ }
274+
275+ /// Starts the ETL pipeline, transitioning from [`PipelineState::Initialized`]
189276 /// to [`PipelineState::Running`].
190277 ///
191278 /// Spawns a background tokio task that iterates over every table and
192- /// processes batch IDs in ascending order. For each batch the task:
279+ /// processes batch IDs in ascending order, skipping batch 0 which was
280+ /// already processed during [`initialize`](ETLPipeline::initialize). For
281+ /// each batch the task:
193282 ///
194283 /// 1. Reads the batch from the [`Source`].
195284 /// 2. Rehydrates it through the [`Dataset`] (appending time columns, etc.).
@@ -198,12 +287,13 @@ impl ETLPipeline {
198287 /// The task transitions to [`PipelineState::Stopped`] when all batches are
199288 /// processed, the [`CancellationToken`] is triggered, or an error occurs.
200289 ///
201- /// Returns an error if the pipeline is not in the [`NotStarted `] state.
290+ /// Returns an error if the pipeline is not in the [`Initialized `] state.
202291 pub fn start ( & mut self ) -> anyhow:: Result < ( ) > {
203- if * self . state_rx . borrow ( ) != PipelineState :: NotStarted {
292+ let current_state = self . state_rx . borrow ( ) . clone ( ) ;
293+ if current_state != PipelineState :: Initialized {
204294 anyhow:: bail!(
205- "Cannot start pipeline: current state is {:?}" ,
206- * self . state_rx . borrow ( )
295+ "Cannot start pipeline: current state is {:?} (must be Initialized) " ,
296+ current_state
207297 ) ;
208298 }
209299
@@ -219,8 +309,12 @@ impl ETLPipeline {
219309 // batch_id so all tables advance together.
220310 let tables = dataset. tables ( ) ;
221311 let mut work: Vec < ( String , u64 ) > = Vec :: new ( ) ;
222- for ( name, _ ) in & tables {
312+ for name in tables. keys ( ) {
223313 for id in dataset. batch_ids ( name) {
314+ // Skip batch 0 — it was already processed during initialize().
315+ if id == 0 {
316+ continue ;
317+ }
224318 work. push ( ( name. clone ( ) , id) ) ;
225319 }
226320 }
0 commit comments