Skip to content

Commit 63ab6b4

Browse files
committed
fix comment format
1 parent f483517 commit 63ab6b4

File tree

7 files changed

+132
-27
lines changed

7 files changed

+132
-27
lines changed

fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java

Lines changed: 0 additions & 21 deletions
This file was deleted.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.api.common.operators.MailboxExecutor;
21+
import org.apache.flink.api.connector.sink2.Sink;
22+
import org.apache.flink.api.connector.sink2.SinkWriter;
23+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
24+
25+
import java.io.IOException;
26+
27+
/**
28+
* Flink sink adapter which hide the different version of createWriter method.
29+
*
30+
* <p>TODO: remove this class when no longer support all the Flink 1.x series.
31+
*/
32+
public abstract class SinkAdapter<InputT> implements Sink<InputT> {
33+
34+
@Override
35+
public SinkWriter<InputT> createWriter(InitContext initContext) throws IOException {
36+
return createWriter(initContext.getMailboxExecutor(), initContext.metricGroup());
37+
}
38+
39+
protected abstract SinkWriter<InputT> createWriter(
40+
MailboxExecutor mailboxExecutor, SinkWriterMetricGroup metricGroup);
41+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.table.catalog.CatalogMaterializedTable;
21+
import org.apache.flink.table.catalog.IntervalFreshness;
22+
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
23+
import org.apache.flink.table.catalog.ResolvedSchema;
24+
25+
/**
26+
* Adapter for {@link ResolvedCatalogMaterializedTable} because the constructor is compatibility in
27+
* flink 2.2. However, this constructor only used in test.
28+
*
29+
* <p>TODO: remove it until <a href="https://issues.apache.org/jira/browse/FLINK-38532">...</a> is
30+
* fixed.
31+
*/
32+
public class ResolvedCatalogMaterializedTableAdapter {
33+
34+
public static ResolvedCatalogMaterializedTable create(
35+
CatalogMaterializedTable origin,
36+
ResolvedSchema resolvedSchema,
37+
CatalogMaterializedTable.RefreshMode refreshMode,
38+
IntervalFreshness freshness) {
39+
return new ResolvedCatalogMaterializedTable(origin, resolvedSchema, refreshMode, freshness);
40+
}
41+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/FlinkSinkAdapter.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/SinkAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*
3131
* <p>TODO: remove this class when no longer support all the Flink 1.x series.
3232
*/
33-
public abstract class FlinkSinkAdapter<InputT> implements Sink<InputT> {
33+
public abstract class SinkAdapter<InputT> implements Sink<InputT> {
3434

3535
@Override
3636
public SinkWriter<InputT> createWriter(InitContext initContext) throws IOException {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.Configuration;
22-
import org.apache.fluss.flink.adapter.FlinkSinkAdapter;
22+
import org.apache.fluss.flink.adapter.SinkAdapter;
2323
import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
2424
import org.apache.fluss.flink.sink.writer.AppendSinkWriter;
2525
import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
@@ -44,8 +44,7 @@
4444
import static org.apache.fluss.flink.utils.FlinkConversions.toFlussRowType;
4545

4646
/** Flink sink for Fluss. */
47-
class FlinkSink<InputT> extends FlinkSinkAdapter<InputT>
48-
implements SupportsPreWriteTopology<InputT> {
47+
class FlinkSink<InputT> extends SinkAdapter<InputT> implements SupportsPreWriteTopology<InputT> {
4948

5049
private static final long serialVersionUID = 1L;
5150

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.adapter;
19+
20+
import org.apache.flink.table.catalog.CatalogMaterializedTable;
21+
import org.apache.flink.table.catalog.IntervalFreshness;
22+
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
23+
import org.apache.flink.table.catalog.ResolvedSchema;
24+
25+
/**
26+
* Adapter for {@link ResolvedCatalogMaterializedTable} because the constructor is compatibility in
27+
* flink 2.2. However, this constructor only used in test.
28+
*
29+
* <p>TODO: remove it until <a href="https://issues.apache.org/jira/browse/FLINK-38532">...</a> is
30+
* fixed.
31+
*/
32+
public class ResolvedCatalogMaterializedTableAdapter {
33+
34+
public static ResolvedCatalogMaterializedTable create(
35+
CatalogMaterializedTable origin,
36+
ResolvedSchema resolvedSchema,
37+
CatalogMaterializedTable.RefreshMode refreshMode,
38+
IntervalFreshness freshness) {
39+
return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
40+
}
41+
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.exception.IllegalConfigurationException;
2323
import org.apache.fluss.exception.InvalidPartitionException;
2424
import org.apache.fluss.exception.InvalidTableException;
25+
import org.apache.fluss.flink.adapter.ResolvedCatalogMaterializedTableAdapter;
2526
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
2627
import org.apache.fluss.flink.utils.FlinkConversionsTest;
2728
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -40,7 +41,6 @@
4041
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
4142
import org.apache.flink.table.catalog.IntervalFreshness;
4243
import org.apache.flink.table.catalog.ObjectPath;
43-
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
4444
import org.apache.flink.table.catalog.ResolvedCatalogTable;
4545
import org.apache.flink.table.catalog.ResolvedSchema;
4646
import org.apache.flink.table.catalog.TableChange;
@@ -158,7 +158,11 @@ protected CatalogMaterializedTable newCatalogMaterializedTable(
158158
.refreshMode(refreshMode)
159159
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
160160
.build();
161-
return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
161+
return ResolvedCatalogMaterializedTableAdapter.create(
162+
origin,
163+
resolvedSchema,
164+
refreshMode,
165+
IntervalFreshness.of("5", IntervalFreshness.TimeUnit.SECOND));
162166
}
163167

164168
protected FlinkCatalog initCatalog(

0 commit comments

Comments
 (0)