12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use std:: future:: Future ;
15
16
use std:: pin:: Pin ;
16
17
17
18
use either:: Either ;
@@ -27,9 +28,11 @@ use risingwave_connector::parser::{
27
28
ByteStreamSourceParser , DebeziumParser , DebeziumProps , EncodingProperties , JsonProperties ,
28
29
ProtocolProperties , SourceStreamChunkBuilder , SpecificParserConfig ,
29
30
} ;
30
- use risingwave_connector:: source:: cdc:: external:: CdcOffset ;
31
+ use risingwave_connector:: source:: cdc:: external:: { CdcOffset , ExternalTableReaderImpl } ;
31
32
use risingwave_connector:: source:: { SourceColumnDesc , SourceContext } ;
32
33
use rw_futures_util:: pausable;
34
+ use thiserror_ext:: AsReport ;
35
+ use tracing:: Instrument ;
33
36
34
37
use crate :: executor:: backfill:: cdc:: state:: CdcBackfillState ;
35
38
use crate :: executor:: backfill:: cdc:: upstream_table:: external:: ExternalStorageTable ;
@@ -42,6 +45,7 @@ use crate::executor::backfill::utils::{
42
45
use crate :: executor:: backfill:: CdcScanOptions ;
43
46
use crate :: executor:: monitor:: CdcBackfillMetrics ;
44
47
use crate :: executor:: prelude:: * ;
48
+ use crate :: executor:: source:: get_infinite_backoff_strategy;
45
49
use crate :: executor:: UpdateMutation ;
46
50
use crate :: task:: CreateMviewProgressReporter ;
47
51
@@ -140,7 +144,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
140
144
let upstream_table_name = self . external_table . qualified_table_name ( ) ;
141
145
let schema_table_name = self . external_table . schema_table_name ( ) . clone ( ) ;
142
146
let external_database_name = self . external_table . database_name ( ) . to_owned ( ) ;
143
- let upstream_table_reader = UpstreamTableReader :: new ( self . external_table ) ;
144
147
145
148
let additional_columns = self
146
149
. output_columns
@@ -168,29 +171,85 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
168
171
// if not, we should bypass the backfill directly.
169
172
let mut state_impl = self . state_impl ;
170
173
171
- let mut upstream = transform_upstream ( upstream, & self . output_columns )
172
- . boxed ( )
173
- . peekable ( ) ;
174
-
175
174
state_impl. init_epoch ( first_barrier_epoch) . await ?;
176
175
177
176
// restore backfill state
178
177
let state = state_impl. restore_state ( ) . await ?;
179
178
current_pk_pos = state. current_pk_pos . clone ( ) ;
180
179
181
- let to_backfill = !self . options . disable_backfill && !state. is_finished ;
180
+ let need_backfill = !self . options . disable_backfill && !state. is_finished ;
182
181
183
182
// Keep track of rows from the snapshot.
184
183
let mut total_snapshot_row_count = state. row_count as u64 ;
185
184
185
+ // After init the state table and forward the initial barrier to downstream,
186
+ // we now try to create the table reader with retry.
187
+ // If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
188
+ // If backfill is finished, we should forward the upstream cdc events to downstream.
189
+ let mut table_reader: Option < ExternalTableReaderImpl > = None ;
190
+ let external_table = self . external_table . clone ( ) ;
191
+ let mut future = Box :: pin ( async move {
192
+ let backoff = get_infinite_backoff_strategy ( ) ;
193
+ tokio_retry:: Retry :: spawn ( backoff, || async {
194
+ match external_table. create_table_reader ( ) . await {
195
+ Ok ( reader) => Ok ( reader) ,
196
+ Err ( e) => {
197
+ tracing:: warn!( error = %e. as_report( ) , "failed to create cdc table reader, retrying..." ) ;
198
+ Err ( e)
199
+ }
200
+ }
201
+ } )
202
+ . instrument ( tracing:: info_span!( "create_cdc_table_reader_with_retry" ) )
203
+ . await
204
+ . expect ( "Retry create cdc table reader until success." )
205
+ } ) ;
206
+ loop {
207
+ if let Some ( msg) =
208
+ build_reader_and_poll_upstream ( & mut upstream, & mut table_reader, & mut future)
209
+ . await ?
210
+ {
211
+ match msg {
212
+ Message :: Barrier ( barrier) => {
213
+ // commit state to bump the epoch of state table
214
+ state_impl. commit_state ( barrier. epoch ) . await ?;
215
+ yield Message :: Barrier ( barrier) ;
216
+ }
217
+ Message :: Chunk ( chunk) => {
218
+ if need_backfill {
219
+ // ignore chunk if we need backfill, since we can read the data from the snapshot
220
+ } else {
221
+ // forward the chunk to downstream
222
+ yield Message :: Chunk ( chunk) ;
223
+ }
224
+ }
225
+ Message :: Watermark ( _) => {
226
+ // ignore watermark
227
+ }
228
+ }
229
+ } else {
230
+ assert ! ( table_reader. is_some( ) , "table reader must created" ) ;
231
+ tracing:: info!(
232
+ table_id,
233
+ upstream_table_name,
234
+ "table reader created successfully"
235
+ ) ;
236
+ break ;
237
+ }
238
+ }
239
+
240
+ let upstream_table_reader = UpstreamTableReader :: new (
241
+ self . external_table . clone ( ) ,
242
+ table_reader. expect ( "table reader must created" ) ,
243
+ ) ;
244
+
245
+ let mut upstream = transform_upstream ( upstream, & self . output_columns )
246
+ . boxed ( )
247
+ . peekable ( ) ;
186
248
let mut last_binlog_offset: Option < CdcOffset > = state
187
249
. last_cdc_offset
188
250
. map_or ( upstream_table_reader. current_cdc_offset ( ) . await ?, Some ) ;
189
251
190
- let offset_parse_func = upstream_table_reader
191
- . inner ( )
192
- . table_reader ( )
193
- . get_cdc_offset_parser ( ) ;
252
+ let offset_parse_func = upstream_table_reader. reader . get_cdc_offset_parser ( ) ;
194
253
let mut consumed_binlog_offset: Option < CdcOffset > = None ;
195
254
196
255
tracing:: info!(
@@ -227,7 +286,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
227
286
// finished.
228
287
//
229
288
// Once the backfill loop ends, we forward the upstream directly to the downstream.
230
- if to_backfill {
289
+ if need_backfill {
231
290
// drive the upstream changelog first to ensure we can receive timely changelog event,
232
291
// otherwise the upstream changelog may be blocked by the snapshot read stream
233
292
let _ = Pin :: new ( & mut upstream) . peek ( ) . await ;
@@ -702,6 +761,26 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
702
761
}
703
762
}
704
763
764
+ async fn build_reader_and_poll_upstream (
765
+ upstream : & mut BoxedMessageStream ,
766
+ table_reader : & mut Option < ExternalTableReaderImpl > ,
767
+ future : & mut Pin < Box < impl Future < Output = ExternalTableReaderImpl > > > ,
768
+ ) -> StreamExecutorResult < Option < Message > > {
769
+ if table_reader. is_some ( ) {
770
+ return Ok ( None ) ;
771
+ }
772
+ tokio:: select! {
773
+ biased;
774
+ reader = & mut * future => {
775
+ * table_reader = Some ( reader) ;
776
+ Ok ( None )
777
+ }
778
+ msg = upstream. next( ) => {
779
+ msg. transpose( )
780
+ }
781
+ }
782
+ }
783
+
705
784
#[ try_stream( ok = Message , error = StreamExecutorError ) ]
706
785
pub async fn transform_upstream ( upstream : BoxedMessageStream , output_columns : & [ ColumnDesc ] ) {
707
786
let props = SpecificParserConfig {
0 commit comments