Skip to content

Commit b243c1a

Browse files
committed
[FLINK-38723][common/composer] Add getFlinkConf method to Context.
1 parent 25fa95e commit b243c1a

6 files changed

Lines changed: 94 additions & 3 deletions

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/Factory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.configuration.ConfigOption;
2222
import org.apache.flink.cdc.common.configuration.Configuration;
23+
import org.apache.flink.configuration.ReadableConfig;
2324

2425
import java.util.Set;
2526

@@ -78,5 +79,10 @@ interface Context {
7879
* <p>The class loader is in particular useful for discovering factories.
7980
*/
8081
ClassLoader getClassLoader();
82+
83+
/** Returns the flink configuration of the current session. */
84+
default ReadableConfig getFlinkConf() {
85+
return new org.apache.flink.configuration.Configuration();
86+
}
8187
}
8288
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,28 @@ public static class DefaultContext implements Factory.Context {
168168
private final Configuration factoryConfiguration;
169169
private final ClassLoader classLoader;
170170
private final Configuration pipelineConfiguration;
171+
private final ReadableConfig flinkConf;
171172

172173
public DefaultContext(
173174
Configuration factoryConfiguration,
174175
Configuration pipelineConfiguration,
175176
ClassLoader classLoader) {
177+
this(
178+
factoryConfiguration,
179+
pipelineConfiguration,
180+
classLoader,
181+
new org.apache.flink.configuration.Configuration());
182+
}
183+
184+
public DefaultContext(
185+
Configuration factoryConfiguration,
186+
Configuration pipelineConfiguration,
187+
ClassLoader classLoader,
188+
ReadableConfig flinkConf) {
176189
this.factoryConfiguration = factoryConfiguration;
177190
this.pipelineConfiguration = pipelineConfiguration;
178191
this.classLoader = classLoader;
192+
this.flinkConf = flinkConf;
179193
}
180194

181195
@Override
@@ -192,5 +206,10 @@ public Configuration getPipelineConfiguration() {
192206
public ClassLoader getClassLoader() {
193207
return classLoader;
194208
}
209+
210+
@Override
211+
public ReadableConfig getFlinkConf() {
212+
return flinkConf;
213+
}
195214
}
196215
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public DataSink createDataSink(
7676
new FactoryHelper.DefaultContext(
7777
sinkDef.getConfig(),
7878
pipelineConfig,
79-
Thread.currentThread().getContextClassLoader()));
79+
Thread.currentThread().getContextClassLoader(),
80+
env.getConfiguration()));
8081
}
8182

8283
public void translate(

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public DataSource createDataSource(
9797
new FactoryHelper.DefaultContext(
9898
sourceDef.getConfig(),
9999
pipelineConfig,
100-
Thread.currentThread().getContextClassLoader());
100+
Thread.currentThread().getContextClassLoader(),
101+
env.getConfiguration());
101102
return sourceFactory.createDataSource(context);
102103
}
103104

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@
1717

1818
package org.apache.flink.cdc.composer.flink;
1919

20+
import org.apache.flink.cdc.common.configuration.ConfigOption;
2021
import org.apache.flink.cdc.common.configuration.Configuration;
2122
import org.apache.flink.cdc.common.factories.DataSinkFactory;
2223
import org.apache.flink.cdc.common.factories.FactoryHelper;
2324
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
2425
import org.apache.flink.cdc.common.sink.DataSink;
26+
import org.apache.flink.cdc.common.sink.EventSinkProvider;
27+
import org.apache.flink.cdc.common.sink.MetadataApplier;
2528
import org.apache.flink.cdc.composer.definition.PipelineDef;
2629
import org.apache.flink.cdc.composer.definition.SinkDef;
2730
import org.apache.flink.cdc.composer.definition.SourceDef;
2831
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
2932
import org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1;
3033
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
34+
import org.apache.flink.configuration.DeploymentOptions;
3135

3236
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
3337

@@ -38,6 +42,8 @@
3842
import org.junit.jupiter.params.provider.MethodSource;
3943

4044
import java.util.Collections;
45+
import java.util.HashSet;
46+
import java.util.Set;
4147
import java.util.stream.Stream;
4248

4349
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -72,6 +78,63 @@ void testCreateDataSinkFromSinkDef() {
7278
.isEqualTo("0.0.0.0");
7379
}
7480

81+
@Test
82+
void testGettingFlinkConfiguration() {
83+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
84+
PipelineDef pipelineDef =
85+
new PipelineDef(
86+
new SourceDef(ValuesDataFactory.IDENTIFIER, null, new Configuration()),
87+
new SinkDef(TestDataSinkFactory.IDENTIFIER, null, new Configuration()),
88+
Collections.emptyList(),
89+
Collections.emptyList(),
90+
Collections.emptyList(),
91+
new Configuration());
92+
// No exception will be thrown.
93+
composer.compose(pipelineDef);
94+
}
95+
96+
/** A dummy {@link DataSinkFactory} for testing. */
97+
public static class TestDataSinkFactory implements DataSinkFactory {
98+
99+
public static final String IDENTIFIER = "test-sink-factory";
100+
101+
@Override
102+
public DataSink createDataSink(Context context) {
103+
// This option has no default value.
104+
String target = context.getFlinkConf().get(DeploymentOptions.TARGET);
105+
if (!"local".equals(target)) {
106+
throw new IllegalArgumentException(
107+
"The flink configuration is invalid. Please check the pipeline configuration.");
108+
}
109+
return new DataSink() {
110+
@Override
111+
public EventSinkProvider getEventSinkProvider() {
112+
return null;
113+
}
114+
115+
@Override
116+
public MetadataApplier getMetadataApplier() {
117+
return schemaChangeEvent -> {};
118+
}
119+
};
120+
}
121+
122+
@Override
123+
public String identifier() {
124+
return IDENTIFIER;
125+
}
126+
127+
@Override
128+
public Set<ConfigOption<?>> requiredOptions() {
129+
return new HashSet<>();
130+
}
131+
132+
@Override
133+
public Set<ConfigOption<?>> optionalOptions() {
134+
return new HashSet<>();
135+
}
136+
}
137+
75138
@ParameterizedTest
76139
@MethodSource
77140
void testInvalidPipelineConfiguration(Configuration pipelineConfig) {

flink-cdc-composer/src/test/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ org.apache.flink.cdc.composer.utils.factory.DataSinkFactory1
1616
org.apache.flink.cdc.composer.utils.factory.DataSinkFactory2
1717
org.apache.flink.cdc.composer.utils.factory.DataSourceFactory1
1818
org.apache.flink.cdc.composer.utils.factory.DataSourceFactory2
19-
org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory
19+
org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory
20+
org.apache.flink.cdc.composer.flink.FlinkPipelineComposerTest$TestDataSinkFactory

0 commit comments

Comments
 (0)