Skip to content

Commit 7ad789e

Browse files
authored
[FLINK-38723][common/composer] Add getFlinkConf method to Context. (apache#4179)
1 parent 25fa95e commit 7ad789e

File tree

6 files changed

+133
-3
lines changed

6 files changed

+133
-3
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.configuration.ConfigOption;
2222
import org.apache.flink.cdc.common.configuration.Configuration;
23+
import org.apache.flink.configuration.ReadableConfig;
2324

2425
import java.util.Set;
2526

@@ -78,5 +79,10 @@ interface Context {
7879
* <p>The class loader is in particular useful for discovering factories.
7980
*/
8081
ClassLoader getClassLoader();
82+
83+
/** Returns the flink configuration of the current session. */
84+
default ReadableConfig getFlinkConf() {
85+
return new org.apache.flink.configuration.Configuration();
86+
}
8187
}
8288
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,28 @@ public static class DefaultContext implements Factory.Context {
168168
private final Configuration factoryConfiguration;
169169
private final ClassLoader classLoader;
170170
private final Configuration pipelineConfiguration;
171+
private final ReadableConfig flinkConf;
171172

172173
public DefaultContext(
173174
Configuration factoryConfiguration,
174175
Configuration pipelineConfiguration,
175176
ClassLoader classLoader) {
177+
this(
178+
factoryConfiguration,
179+
pipelineConfiguration,
180+
classLoader,
181+
new org.apache.flink.configuration.Configuration());
182+
}
183+
184+
public DefaultContext(
185+
Configuration factoryConfiguration,
186+
Configuration pipelineConfiguration,
187+
ClassLoader classLoader,
188+
ReadableConfig flinkConf) {
176189
this.factoryConfiguration = factoryConfiguration;
177190
this.pipelineConfiguration = pipelineConfiguration;
178191
this.classLoader = classLoader;
192+
this.flinkConf = flinkConf;
179193
}
180194

181195
@Override
@@ -192,5 +206,10 @@ public Configuration getPipelineConfiguration() {
192206
public ClassLoader getClassLoader() {
193207
return classLoader;
194208
}
209+
210+
@Override
211+
public ReadableConfig getFlinkConf() {
212+
return flinkConf;
213+
}
195214
}
196215
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public DataSink createDataSink(
7676
new FactoryHelper.DefaultContext(
7777
sinkDef.getConfig(),
7878
pipelineConfig,
79-
Thread.currentThread().getContextClassLoader()));
79+
Thread.currentThread().getContextClassLoader(),
80+
env.getConfiguration()));
8081
}
8182

8283
public void translate(

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public DataSource createDataSource(
9797
new FactoryHelper.DefaultContext(
9898
sourceDef.getConfig(),
9999
pipelineConfig,
100-
Thread.currentThread().getContextClassLoader());
100+
Thread.currentThread().getContextClassLoader(),
101+
env.getConfiguration());
101102
return sourceFactory.createDataSource(context);
102103
}
103104

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,25 @@
1717

1818
package org.apache.flink.cdc.composer.flink;
1919

20+
import org.apache.flink.cdc.common.configuration.ConfigOption;
2021
import org.apache.flink.cdc.common.configuration.Configuration;
2122
import org.apache.flink.cdc.common.factories.DataSinkFactory;
23+
import org.apache.flink.cdc.common.factories.DataSourceFactory;
2224
import org.apache.flink.cdc.common.factories.FactoryHelper;
2325
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
2426
import org.apache.flink.cdc.common.sink.DataSink;
27+
import org.apache.flink.cdc.common.sink.EventSinkProvider;
28+
import org.apache.flink.cdc.common.sink.MetadataApplier;
29+
import org.apache.flink.cdc.common.source.DataSource;
2530
import org.apache.flink.cdc.composer.definition.PipelineDef;
2631
import org.apache.flink.cdc.composer.definition.SinkDef;
2732
import org.apache.flink.cdc.composer.definition.SourceDef;
2833
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
2934
import org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1;
3035
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
36+
import org.apache.flink.cdc.connectors.values.source.ValuesDataSource;
37+
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
38+
import org.apache.flink.configuration.DeploymentOptions;
3139

3240
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
3341

@@ -38,8 +46,11 @@
3846
import org.junit.jupiter.params.provider.MethodSource;
3947

4048
import java.util.Collections;
49+
import java.util.HashSet;
50+
import java.util.Set;
4151
import java.util.stream.Stream;
4252

53+
import static org.assertj.core.api.Assertions.assertThatCode;
4354
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4455

4556
/** A test for the {@link FlinkPipelineComposer}. */
@@ -72,6 +83,96 @@ void testCreateDataSinkFromSinkDef() {
7283
.isEqualTo("0.0.0.0");
7384
}
7485

86+
@Test
87+
void testGettingFlinkConfiguration() {
88+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
89+
PipelineDef pipelineDef =
90+
new PipelineDef(
91+
new SourceDef(TestDataSourceFactory.IDENTIFIER, null, new Configuration()),
92+
new SinkDef(TestDataSinkFactory.IDENTIFIER, null, new Configuration()),
93+
Collections.emptyList(),
94+
Collections.emptyList(),
95+
Collections.emptyList(),
96+
new Configuration());
97+
98+
assertThatCode(() -> composer.compose(pipelineDef)).doesNotThrowAnyException();
99+
}
100+
101+
/** A dummy {@link DataSinkFactory} that validates the execution target. */
102+
public static class TestDataSinkFactory implements DataSinkFactory {
103+
104+
public static final String IDENTIFIER = "test-sink-factory";
105+
106+
@Override
107+
public DataSink createDataSink(Context context) {
108+
// This option has no default value.
109+
String target = context.getFlinkConf().get(DeploymentOptions.TARGET);
110+
if (!"local".equals(target)) {
111+
throw new IllegalArgumentException(
112+
"The flink configuration is invalid. Please check the pipeline configuration.");
113+
}
114+
return new DataSink() {
115+
@Override
116+
public EventSinkProvider getEventSinkProvider() {
117+
return null;
118+
}
119+
120+
@Override
121+
public MetadataApplier getMetadataApplier() {
122+
return schemaChangeEvent -> {};
123+
}
124+
};
125+
}
126+
127+
@Override
128+
public String identifier() {
129+
return IDENTIFIER;
130+
}
131+
132+
@Override
133+
public Set<ConfigOption<?>> requiredOptions() {
134+
return new HashSet<>();
135+
}
136+
137+
@Override
138+
public Set<ConfigOption<?>> optionalOptions() {
139+
return new HashSet<>();
140+
}
141+
}
142+
143+
/** A dummy {@link DataSourceFactory} that validates the execution target. */
144+
public static class TestDataSourceFactory implements DataSourceFactory {
145+
146+
public static final String IDENTIFIER = "test-source-factory";
147+
148+
@Override
149+
public DataSource createDataSource(Context context) {
150+
// This option has no default value.
151+
String target = context.getFlinkConf().get(DeploymentOptions.TARGET);
152+
if (!"local".equals(target)) {
153+
throw new IllegalArgumentException(
154+
"The flink configuration is invalid. Please check the pipeline configuration.");
155+
}
156+
return new ValuesDataSource(
157+
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE, Integer.MAX_VALUE);
158+
}
159+
160+
@Override
161+
public String identifier() {
162+
return IDENTIFIER;
163+
}
164+
165+
@Override
166+
public Set<ConfigOption<?>> requiredOptions() {
167+
return new HashSet<>();
168+
}
169+
170+
@Override
171+
public Set<ConfigOption<?>> optionalOptions() {
172+
return new HashSet<>();
173+
}
174+
}
175+
75176
@ParameterizedTest
76177
@MethodSource
77178
void testInvalidPipelineConfiguration(Configuration pipelineConfig) {

flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1
1616
org.apache.flink.cdc.composer.utils.factory.DataSinkFactory2
1717
org.apache.flink.cdc.composer.utils.factory.DataSourceFactory1
1818
org.apache.flink.cdc.composer.utils.factory.DataSourceFactory2
19-
org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory
19+
org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory
20+
org.apache.flink.cdc.composer.flink.FlinkPipelineComposerTest$TestDataSinkFactory
21+
org.apache.flink.cdc.composer.flink.FlinkPipelineComposerTest$TestDataSourceFactory

0 commit comments

Comments
 (0)