Skip to content

Commit 03bea48

Browse files
authored
[FLINK-37649][datagen] Datagen connector cannot set length for collection type
* [FLINK-37649][datagen] Datagen connector cannot set length for collection type
1 parent aff916e commit 03bea48

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

Diff for: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
7474

7575
public static final int RANDOM_BYTES_LENGTH_DEFAULT = 100;
7676

77-
private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
77+
public static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
7878

7979
private static final float NULL_RATE_DEFAULT = 0f;
8080

@@ -352,6 +352,7 @@ public DataGeneratorContainer visit(ArrayType arrayType) {
352352
RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption));
353353
Set<ConfigOption<?>> options = container.getOptions();
354354
options.add(nr);
355+
options.add(lenOption);
355356
return DataGeneratorContainer.of(
356357
new DataGeneratorMapper<>(generator, (GenericArrayData::new), config.get(nr)),
357358
options.toArray(new ConfigOption<?>[0]));
@@ -374,6 +375,7 @@ public DataGeneratorContainer visit(MultisetType multisetType) {
374375
Set<ConfigOption<?>> options = container.getOptions();
375376
ConfigOption<Float> nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT);
376377
options.add(nr);
378+
options.add(lenOption);
377379

378380
return DataGeneratorContainer.of(
379381
new DataGeneratorMapper<>(mapGenerator, GenericMapData::new, config.get(nr)),
@@ -397,6 +399,7 @@ public DataGeneratorContainer visit(MapType mapType) {
397399
Set<ConfigOption<?>> options = keyContainer.getOptions();
398400
options.addAll(valContainer.getOptions());
399401
options.add(nr);
402+
options.add(lenOption);
400403

401404
DataGenerator<Map<Object, Object>> mapGenerator =
402405
RandomGenerator.mapGenerator(

Diff for: flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java

+43
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.Map;
5757
import java.util.Set;
5858

59+
import static org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT;
5960
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
6061
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
6162
import static org.assertj.core.api.Assertions.assertThat;
@@ -81,6 +82,11 @@ class DataGenTableSourceFactoryTest {
8182
Column.physical("f2", DataTypes.VARCHAR(30)),
8283
Column.physical("f3", DataTypes.VARBINARY(20)),
8384
Column.physical("f4", DataTypes.STRING()));
85+
private static final ResolvedSchema COLLECTION_SCHEMA =
86+
ResolvedSchema.of(
87+
Column.physical("f0", DataTypes.ARRAY(DataTypes.STRING())),
88+
Column.physical("f1", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())),
89+
Column.physical("f2", DataTypes.MULTISET(DataTypes.INT())));
8490

8591
@Test
8692
void testDataTypeCoverage() throws Exception {
@@ -347,6 +353,43 @@ void testVariableLengthDataType() throws Exception {
347353
"Custom length '21' for variable-length type (VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined in the schema.");
348354
}
349355

356+
@Test
357+
void testLengthForCollectionType() throws Exception {
358+
DescriptorProperties descriptor = new DescriptorProperties();
359+
final int rowsNumber = 200;
360+
final int collectionSize = 10;
361+
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
362+
descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber);
363+
// test for default length.
364+
List<RowData> results = runGenerator(COLLECTION_SCHEMA, descriptor);
365+
assertThat(results).hasSize(rowsNumber);
366+
for (RowData row : results) {
367+
assertThat(row.getArray(0).size()).isEqualTo(RANDOM_COLLECTION_LENGTH_DEFAULT);
368+
assertThat(row.getMap(1).size())
369+
.isEqualTo(RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT);
370+
assertThat(row.getMap(2).size())
371+
.isEqualTo(RandomGeneratorVisitor.RANDOM_COLLECTION_LENGTH_DEFAULT);
372+
}
373+
374+
// test for provided length.
375+
descriptor.putLong(
376+
DataGenConnectorOptionsUtil.FIELDS + ".f0." + DataGenConnectorOptionsUtil.LENGTH,
377+
collectionSize);
378+
descriptor.putLong(
379+
DataGenConnectorOptionsUtil.FIELDS + ".f1." + DataGenConnectorOptionsUtil.LENGTH,
380+
collectionSize);
381+
descriptor.putLong(
382+
DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.LENGTH,
383+
collectionSize);
384+
results = runGenerator(COLLECTION_SCHEMA, descriptor);
385+
assertThat(results).hasSize(rowsNumber);
386+
for (RowData row : results) {
387+
assertThat(row.getArray(0).size()).isEqualTo(collectionSize);
388+
assertThat(row.getMap(1).size()).isEqualTo(collectionSize);
389+
assertThat(row.getMap(2).size()).isEqualTo(collectionSize);
390+
}
391+
}
392+
350393
@Test
351394
void testFixedLengthDataType() throws Exception {
352395
DescriptorProperties descriptor = new DescriptorProperties();

0 commit comments

Comments
 (0)