Skip to content

Commit ef5777a

Browse files
authored
[FLINK-39063][connect/fluss] Fix some spelling mistakes in fluss pipeline connector (#4271)
1 parent 7a83225 commit ef5777a

5 files changed

Lines changed: 10 additions & 10 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussEventSerializationSchema.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,14 @@ private TableSchemaInfo(
145145

146146
static Map<Integer, Integer> sanityCheckAndGenerateIndexMapping(
147147
com.alibaba.fluss.metadata.Schema inferredFlussSchema,
148-
com.alibaba.fluss.metadata.Schema currentFlussnewSchema) {
148+
com.alibaba.fluss.metadata.Schema currentFlussNewSchema) {
149149
List<String> inferredSchemaColumnNames = inferredFlussSchema.getColumnNames();
150150
Map<String, Integer> reverseIndex = new HashMap<>();
151151
for (int i = 0; i < inferredSchemaColumnNames.size(); i++) {
152152
reverseIndex.put(inferredSchemaColumnNames.get(i), i);
153153
}
154154

155-
List<String> currentSchemaColumnNames = currentFlussnewSchema.getColumnNames();
155+
List<String> currentSchemaColumnNames = currentFlussNewSchema.getColumnNames();
156156
Map<Integer, Integer> indexMapping = new HashMap<>();
157157
for (int newSchemaIndex = 0;
158158
newSchemaIndex < currentSchemaColumnNames.size();
@@ -167,7 +167,7 @@ static Map<Integer, Integer> sanityCheckAndGenerateIndexMapping(
167167
// changes.
168168
// In the future, meta applier will be used to handle column changes.
169169
DataType oldDataType = inferredFlussSchema.getRowType().getTypeAt(oldSchemaIndex);
170-
DataType newDataType = currentFlussnewSchema.getRowType().getTypeAt(newSchemaIndex);
170+
DataType newDataType = currentFlussNewSchema.getRowType().getTypeAt(newSchemaIndex);
171171
if (!oldDataType.copy(false).equals(newDataType.copy(false))) {
172172
throw new IllegalArgumentException(
173173
"The data type of column "

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetaDataApplier.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTa
149149
.collect(Collectors.toList());
150150
if (!inferredPrimaryKeyColumnNames.equals(currentPrimaryKeyColumnNames)) {
151151
throw new ValidationException(
152-
"The table schema inffered by Flink CDC is not matched with current Fluss table schema. "
152+
"The table schema inferred by Flink CDC is not matched with current Fluss table schema. "
153153
+ "\n New Fluss table's primary keys : "
154154
+ inferredPrimaryKeyColumnNames
155155
+ "\n Current Fluss's primary keys: "
@@ -160,7 +160,7 @@ private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTa
160160
List<String> currentBucketKeys = currentTableInfo.getBucketKeys();
161161
if (!inferredBucketKeys.equals(currentBucketKeys)) {
162162
throw new ValidationException(
163-
"The table schema inffered by Flink CDC is not matched with current Fluss table schema. "
163+
"The table schema inferred by Flink CDC is not matched with current Fluss table schema. "
164164
+ "\n New Fluss table's bucket keys : "
165165
+ inferredBucketKeys
166166
+ "\n Current Fluss's bucket keys: "
@@ -171,7 +171,7 @@ private void sanityCheck(TableDescriptor inferredFlussTable, TableInfo currentTa
171171
List<String> currentPartitionKeys = currentTableInfo.getPartitionKeys();
172172
if (!inferredPartitionKeys.equals(currentPartitionKeys)) {
173173
throw new ValidationException(
174-
"The table schema inffered by Flink CDC is not matched with current Fluss table schema. "
174+
"The table schema inferred by Flink CDC is not matched with current Fluss table schema. "
175175
+ "\n New Fluss table's partition keys : "
176176
+ inferredPartitionKeys
177177
+ "\n Current Fluss's partition keys: "

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WarppedFlussCounter.java renamed to flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrappedFlussCounter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
* additional information regarding copyright ownership. */
2525

2626
/** An implementation of Flink's {@link Counter} which wraps Fluss's Counter. */
27-
public class WarppedFlussCounter implements Counter {
27+
public class WrappedFlussCounter implements Counter {
2828

2929
private final com.alibaba.fluss.metrics.Counter flussCounter;
3030

31-
public WarppedFlussCounter(com.alibaba.fluss.metrics.Counter flussCounter) {
31+
public WrappedFlussCounter(com.alibaba.fluss.metrics.Counter flussCounter) {
3232
this.flussCounter = flussCounter;
3333
}
3434

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/main/java/org/apache/flink/cdc/connectors/fluss/sink/v2/metrics/WrapperFlussMetricRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private Map<String, String> getVariables(AbstractMetricGroup group) {
116116
private void registerMetric(MetricGroup metricGroup, Metric metric, String metricName) {
117117
switch (metric.getMetricType()) {
118118
case COUNTER:
119-
metricGroup.counter(metricName, new WarppedFlussCounter((Counter) metric));
119+
metricGroup.counter(metricName, new WrappedFlussCounter((Counter) metric));
120120
break;
121121
case METER:
122122
metricGroup.meter(metricName, new WrapperFlussMeter((Meter) metric));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/FlussMetadataApplierTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ void testRecreateTableWithDifferentSchema() throws Exception {
511511
applier.applySchemaChange(
512512
new CreateTableEvent(tableId, differentSchema)))
513513
.hasMessageContaining(
514-
"The table schema inffered by Flink CDC is not matched with current Fluss table schema");
514+
"The table schema inferred by Flink CDC is not matched with current Fluss table schema");
515515
}
516516

517517
// recreate table with schema1 again

0 commit comments

Comments
 (0)