2020import org .apache .flink .cdc .common .configuration .ConfigOption ;
2121import org .apache .flink .cdc .common .configuration .Configuration ;
2222import org .apache .flink .cdc .common .factories .DataSinkFactory ;
23+ import org .apache .flink .cdc .common .factories .DataSourceFactory ;
2324import org .apache .flink .cdc .common .factories .FactoryHelper ;
2425import org .apache .flink .cdc .common .pipeline .PipelineOptions ;
2526import org .apache .flink .cdc .common .sink .DataSink ;
2627import org .apache .flink .cdc .common .sink .EventSinkProvider ;
2728import org .apache .flink .cdc .common .sink .MetadataApplier ;
29+ import org .apache .flink .cdc .common .source .DataSource ;
2830import org .apache .flink .cdc .composer .definition .PipelineDef ;
2931import org .apache .flink .cdc .composer .definition .SinkDef ;
3032import org .apache .flink .cdc .composer .definition .SourceDef ;
3133import org .apache .flink .cdc .composer .utils .FactoryDiscoveryUtils ;
3234import org .apache .flink .cdc .composer .utils .factory .DataSinkFactory1 ;
3335import org .apache .flink .cdc .connectors .values .factory .ValuesDataFactory ;
36+ import org .apache .flink .cdc .connectors .values .source .ValuesDataSource ;
37+ import org .apache .flink .cdc .connectors .values .source .ValuesDataSourceHelper ;
3438import org .apache .flink .configuration .DeploymentOptions ;
3539
3640import org .apache .flink .shaded .guava31 .com .google .common .collect .ImmutableMap ;
@@ -83,7 +87,7 @@ void testGettingFlinkConfiguration() {
8387 FlinkPipelineComposer composer = FlinkPipelineComposer .ofMiniCluster ();
8488 PipelineDef pipelineDef =
8589 new PipelineDef (
86- new SourceDef (ValuesDataFactory .IDENTIFIER , null , new Configuration ()),
90+ new SourceDef (TestDataSourceFactory .IDENTIFIER , null , new Configuration ()),
8791 new SinkDef (TestDataSinkFactory .IDENTIFIER , null , new Configuration ()),
8892 Collections .emptyList (),
8993 Collections .emptyList (),
@@ -93,7 +97,7 @@ void testGettingFlinkConfiguration() {
9397 composer .compose (pipelineDef );
9498 }
9599
96- /** A dummy {@link DataSinkFactory} for testing . */
100+ /** A dummy {@link DataSinkFactory} that validates the execution target . */
97101 public static class TestDataSinkFactory implements DataSinkFactory {
98102
99103 public static final String IDENTIFIER = "test-sink-factory" ;
@@ -135,6 +139,39 @@ public Set<ConfigOption<?>> optionalOptions() {
135139 }
136140 }
137141
142+ /** A dummy {@link DataSourceFactory} that validates the execution target. */
143+ public static class TestDataSourceFactory implements DataSourceFactory {
144+
145+ public static final String IDENTIFIER = "test-source-factory" ;
146+
147+ @ Override
148+ public DataSource createDataSource (Context context ) {
149+ // This option has no default value.
150+ String target = context .getFlinkConf ().get (DeploymentOptions .TARGET );
151+ if (!"local" .equals (target )) {
152+ throw new IllegalArgumentException (
153+ "The flink configuration is invalid. Please check the pipeline configuration." );
154+ }
155+ return new ValuesDataSource (
156+ ValuesDataSourceHelper .EventSetId .SINGLE_SPLIT_SINGLE_TABLE , Integer .MAX_VALUE );
157+ }
158+
159+ @ Override
160+ public String identifier () {
161+ return IDENTIFIER ;
162+ }
163+
164+ @ Override
165+ public Set <ConfigOption <?>> requiredOptions () {
166+ return new HashSet <>();
167+ }
168+
169+ @ Override
170+ public Set <ConfigOption <?>> optionalOptions () {
171+ return new HashSet <>();
172+ }
173+ }
174+
138175 @ ParameterizedTest
139176 @ MethodSource
140177 void testInvalidPipelineConfiguration (Configuration pipelineConfig ) {
0 commit comments