|
22 | 22 | import org.apache.flink.cdc.common.factories.DataSinkFactory; |
23 | 23 | import org.apache.flink.cdc.common.factories.FactoryHelper; |
24 | 24 | import org.apache.flink.cdc.common.sink.DataSink; |
| 25 | +import org.apache.flink.cdc.common.sink.FlinkSinkProvider; |
25 | 26 | import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils; |
| 27 | +import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEventSink; |
26 | 28 | import org.apache.flink.table.api.ValidationException; |
27 | 29 |
|
28 | 30 | import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; |
29 | 31 |
|
30 | 32 | import org.assertj.core.api.Assertions; |
31 | 33 | import org.junit.jupiter.api.Test; |
32 | 34 | import org.junit.jupiter.api.io.TempDir; |
| 35 | +import org.junit.jupiter.params.ParameterizedTest; |
| 36 | +import org.junit.jupiter.params.provider.ValueSource; |
33 | 37 |
|
34 | 38 | import java.io.File; |
35 | 39 | import java.util.HashMap; |
|
38 | 42 | import java.util.UUID; |
39 | 43 | import java.util.stream.Collectors; |
40 | 44 |
|
| 45 | +import static org.assertj.core.api.InstanceOfAssertFactories.type; |
| 46 | + |
41 | 47 | /** Tests for {@link PaimonDataSinkFactory}. */ |
42 | 48 | class PaimonDataSinkFactoryTest { |
43 | 49 |
|
@@ -164,4 +170,40 @@ void testPrefixRequireOption() { |
164 | 170 | conf, conf, Thread.currentThread().getContextClassLoader())); |
165 | 171 | Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class); |
166 | 172 | } |
| 173 | + |
| 174 | + @ParameterizedTest(name = "{0}") |
| 175 | + @ValueSource(strings = {"commit.user", "commit.user-prefix"}) |
| 176 | + void testSpecifyingCommitUser(String commitUserKey) { |
| 177 | + DataSinkFactory sinkFactory = |
| 178 | + FactoryDiscoveryUtils.getFactoryByIdentifier("paimon", DataSinkFactory.class); |
| 179 | + Assertions.assertThat(sinkFactory).isInstanceOf(PaimonDataSinkFactory.class); |
| 180 | + Configuration conf = |
| 181 | + Configuration.fromMap( |
| 182 | + ImmutableMap.<String, String>builder() |
| 183 | + .put(PaimonDataSinkOptions.METASTORE.key(), "filesystem") |
| 184 | + .put( |
| 185 | + PaimonDataSinkOptions.WAREHOUSE.key(), |
| 186 | + new File( |
| 187 | + temporaryFolder.toFile(), |
| 188 | + UUID.randomUUID().toString()) |
| 189 | + .toString()) |
| 190 | + .put(commitUserKey, "yux") |
| 191 | + .build()); |
| 192 | + |
| 193 | + DataSink dataSink = |
| 194 | + sinkFactory.createDataSink( |
| 195 | + new FactoryHelper.DefaultContext( |
| 196 | + conf, conf, Thread.currentThread().getContextClassLoader())); |
| 197 | + Assertions.assertThat(dataSink).isInstanceOf(PaimonDataSink.class); |
| 198 | + Assertions.assertThat(dataSink).extracting("commitUser").isEqualTo("yux"); |
| 199 | + Assertions.assertThat(dataSink.getEventSinkProvider()) |
| 200 | + .isInstanceOf(FlinkSinkProvider.class) |
| 201 | + .asInstanceOf(type(FlinkSinkProvider.class)) |
| 202 | + .extracting(FlinkSinkProvider::getSink) |
| 203 | + .isExactlyInstanceOf(PaimonEventSink.class) |
| 204 | + .extracting("commitUser") |
| 205 | + .asString() |
| 206 | + .hasSize(39) // 3 ("yux") + 36 (Random UUID) |
| 207 | + .startsWith("yux"); |
| 208 | + } |
167 | 209 | } |
0 commit comments