@@ -150,8 +150,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
150150 . cloned ( )
151151 . collect_vec ( ) ;
152152
153- let mut upstream =
154- transform_upstream ( self . upstream . execute ( ) , self . output_columns . clone ( ) ) . boxed ( ) ;
153+ let mut upstream = self . upstream . execute ( ) ;
155154
156155 // Current position of the upstream_table storage primary key.
157156 // `None` means it starts from the beginning.
@@ -202,27 +201,32 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
202201 . await
203202 . expect ( "Retry create cdc table reader until success." )
204203 } ) ;
204+
205+ // Make sure to use mapping_message after transform_upstream.
206+ let mut upstream = transform_upstream ( upstream, self . output_columns . clone ( ) ) . boxed ( ) ;
205207 loop {
206208 if let Some ( msg) =
207209 build_reader_and_poll_upstream ( & mut upstream, & mut table_reader, & mut future)
208210 . await ?
209211 {
210- match msg {
211- Message :: Barrier ( barrier) => {
212- // commit state to bump the epoch of state table
213- state_impl. commit_state ( barrier. epoch ) . await ?;
214- yield Message :: Barrier ( barrier) ;
215- }
216- Message :: Chunk ( chunk) => {
217- if need_backfill {
218- // ignore chunk if we need backfill, since we can read the data from the snapshot
219- } else {
220- // forward the chunk to downstream
221- yield Message :: Chunk ( chunk) ;
212+ if let Some ( msg) = mapping_message ( msg, & self . output_indices ) {
213+ match msg {
214+ Message :: Barrier ( barrier) => {
215+ // commit state to bump the epoch of state table
216+ state_impl. commit_state ( barrier. epoch ) . await ?;
217+ yield Message :: Barrier ( barrier) ;
218+ }
219+ Message :: Chunk ( chunk) => {
220+ if need_backfill {
221+ // ignore chunk if we need backfill, since we can read the data from the snapshot
222+ } else {
223+ // forward the chunk to downstream
224+ yield Message :: Chunk ( chunk) ;
225+ }
226+ }
227+ Message :: Watermark ( _) => {
228+ // ignore watermark
222229 }
223- }
224- Message :: Watermark ( _) => {
225- // ignore watermark
226230 }
227231 }
228232 } else {
@@ -888,13 +892,21 @@ mod tests {
888892 use std:: str:: FromStr ;
889893
890894 use futures:: { StreamExt , pin_mut} ;
891- use risingwave_common:: array:: { DataChunk , Op , StreamChunk } ;
895+ use risingwave_common:: array:: { Array , DataChunk , Op , StreamChunk } ;
892896 use risingwave_common:: catalog:: { ColumnDesc , ColumnId , Field , Schema } ;
893897 use risingwave_common:: types:: { DataType , Datum , JsonbVal } ;
898+ use risingwave_common:: util:: epoch:: test_epoch;
894899 use risingwave_common:: util:: iter_util:: ZipEqFast ;
900+ use risingwave_storage:: memory:: MemoryStateStore ;
895901
896902 use crate :: executor:: backfill:: cdc:: cdc_backfill:: transform_upstream;
903+ use crate :: executor:: monitor:: StreamingMetrics ;
904+ use crate :: executor:: prelude:: StateTable ;
905+ use crate :: executor:: source:: default_source_internal_table;
897906 use crate :: executor:: test_utils:: MockSource ;
907+ use crate :: executor:: {
908+ ActorContext , Barrier , CdcBackfillExecutor , CdcScanOptions , ExternalStorageTable , Message ,
909+ } ;
898910
899911 #[ tokio:: test]
900912 async fn test_transform_upstream_chunk ( ) {
@@ -949,4 +961,96 @@ mod tests {
949961 println ! ( "chunk: {:#?}" , message. unwrap( ) ) ;
950962 }
951963 }
964+
965+ #[ tokio:: test]
966+ async fn test_build_reader_and_poll_upstream ( ) {
967+ let actor_context = ActorContext :: for_test ( 1 ) ;
968+ let external_storage_table = ExternalStorageTable :: for_test_undefined ( ) ;
969+ let schema = Schema :: new ( vec ! [
970+ Field :: unnamed( DataType :: Jsonb ) , // debezium json payload
971+ Field :: unnamed( DataType :: Varchar ) , // _rw_offset
972+ Field :: unnamed( DataType :: Varchar ) , // _rw_table_name
973+ ] ) ;
974+ let pk_indices = vec ! [ 1 ] ;
975+ let ( mut tx, source) = MockSource :: channel ( ) ;
976+ let source = source. into_executor ( schema. clone ( ) , pk_indices. clone ( ) ) ;
977+ let output_indices = vec ! [ 1 , 0 , 4 ] ; //reorder
978+ let output_columns = vec ! [
979+ ColumnDesc :: named( "O_ORDERKEY" , ColumnId :: new( 1 ) , DataType :: Int64 ) ,
980+ ColumnDesc :: named( "O_CUSTKEY" , ColumnId :: new( 2 ) , DataType :: Int64 ) ,
981+ ColumnDesc :: named( "O_ORDERSTATUS" , ColumnId :: new( 3 ) , DataType :: Varchar ) ,
982+ ColumnDesc :: named( "O_TOTALPRICE" , ColumnId :: new( 4 ) , DataType :: Decimal ) ,
983+ ColumnDesc :: named( "O_DUMMY" , ColumnId :: new( 5 ) , DataType :: Int64 ) ,
984+ ColumnDesc :: named( "commit_ts" , ColumnId :: new( 6 ) , DataType :: Timestamptz ) ,
985+ ] ;
986+ let store = MemoryStateStore :: new ( ) ;
987+ let state_table =
988+ StateTable :: from_table_catalog ( & default_source_internal_table ( 0x2333 ) , store, None )
989+ . await ;
990+ let cdc = CdcBackfillExecutor :: new (
991+ actor_context,
992+ external_storage_table,
993+ source,
994+ output_indices,
995+ output_columns,
996+ None ,
997+ StreamingMetrics :: unused ( ) . into ( ) ,
998+ state_table,
999+ None ,
1000+ CdcScanOptions {
1001+ // We want to mark backfill as finished. However it's not straightforward to do so.
1002+ // Here we disable_backfill instead.
1003+ disable_backfill : true ,
1004+ ..CdcScanOptions :: default ( )
1005+ } ,
1006+ ) ;
1007+ // cdc.state_impl.init_epoch(EpochPair::new(test_epoch(4), test_epoch(3))).await.unwrap();
1008+ // cdc.state_impl.mutate_state(None, None, 0, true).await.unwrap();
1009+ // cdc.state_impl.commit_state(EpochPair::new(test_epoch(5), test_epoch(4))).await.unwrap();
1010+ let s = cdc. execute_inner ( ) ;
1011+ pin_mut ! ( s) ;
1012+
1013+ // send first barrier
1014+ tx. send_barrier ( Barrier :: new_test_barrier ( test_epoch ( 8 ) ) ) ;
1015+ // send chunk
1016+ {
1017+ let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_DUMMY": 100 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "r", "ts_ms": 1695277757017, "transaction": null } }"# ;
1018+ let datums: Vec < Datum > = vec ! [
1019+ Some ( JsonbVal :: from_str( payload) . unwrap( ) . into( ) ) ,
1020+ Some ( "file: 1.binlog, pos: 100" . to_owned( ) . into( ) ) ,
1021+ Some ( "mydb.orders" . to_owned( ) . into( ) ) ,
1022+ ] ;
1023+ let mut builders = schema. create_array_builders ( 8 ) ;
1024+ for ( builder, datum) in builders. iter_mut ( ) . zip_eq_fast ( datums. iter ( ) ) {
1025+ builder. append ( datum. clone ( ) ) ;
1026+ }
1027+ let columns = builders
1028+ . into_iter ( )
1029+ . map ( |builder| builder. finish ( ) . into ( ) )
1030+ . collect ( ) ;
1031+ // one row chunk
1032+ let chunk = StreamChunk :: from_parts ( vec ! [ Op :: Insert ] , DataChunk :: new ( columns, 1 ) ) ;
1033+
1034+ tx. push_chunk ( chunk) ;
1035+ }
1036+ let _first_barrier = s. next ( ) . await . unwrap ( ) ;
1037+ let upstream_change_log = s. next ( ) . await . unwrap ( ) . unwrap ( ) ;
1038+ let Message :: Chunk ( chunk) = upstream_change_log else {
1039+ panic ! ( "expect chunk" ) ;
1040+ } ;
1041+ assert_eq ! ( chunk. columns( ) . len( ) , 3 ) ;
1042+ assert_eq ! ( chunk. rows( ) . count( ) , 1 ) ;
1043+ assert_eq ! (
1044+ chunk. columns( ) [ 0 ] . as_int64( ) . iter( ) . collect:: <Vec <_>>( ) ,
1045+ vec![ Some ( 44485 ) ]
1046+ ) ;
1047+ assert_eq ! (
1048+ chunk. columns( ) [ 1 ] . as_int64( ) . iter( ) . collect:: <Vec <_>>( ) ,
1049+ vec![ Some ( 5 ) ]
1050+ ) ;
1051+ assert_eq ! (
1052+ chunk. columns( ) [ 2 ] . as_int64( ) . iter( ) . collect:: <Vec <_>>( ) ,
1053+ vec![ Some ( 100 ) ]
1054+ ) ;
1055+ }
9521056}
0 commit comments