Skip to content

Commit 7e3c311

Browse files
committed
Merge branch 'refs/heads/main' into flink-support-patition-pushdown
2 parents 11a3ab7 + 98c2f47 commit 7e3c311

File tree

82 files changed

+791
-283
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+791
-283
lines changed

.github/ISSUE_TEMPLATE/bug.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ body:
3939
description: What Fluss version are you using?
4040
multiple: false
4141
options:
42-
- "0.6.0 (latest release)"
42+
- "0.7.0 (latest release)"
43+
- "0.6.0"
4344
- "0.5.0"
4445
- "main (development)"
4546
validations:

.github/ISSUE_TEMPLATE/feature.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
################################################################################
1818

1919
name: Feature Request 🚀
20-
description: Add new feature, improvement, and more
20+
description: User-facing functionality or improvement you’d like to see added
2121
type: "feature"
2222
body:
2323
- type: markdown

.github/ISSUE_TEMPLATE/task.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
################################################################################
1818

1919
name: Task 📌
20-
description: A specific piece of work
20+
description: Specific work item - either part of a larger feature or independent project maintenance
2121
type: "task"
2222
body:
2323
- type: markdown

fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public final class FlussConnection implements Connection {
7979
String clientId = conf.getString(ConfigOptions.CLIENT_ID);
8080
this.metricRegistry = metricRegistry;
8181
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, clientId);
82-
this.rpcClient = RpcClient.create(conf, clientMetricGroup);
82+
this.rpcClient = RpcClient.create(conf, clientMetricGroup, false);
8383

8484
// TODO this maybe remove after we introduce client metadata.
8585
this.metadataUpdater = new MetadataUpdater(conf, rpcClient);

fluss-client/src/test/java/com/alibaba/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ private TestingMetadataUpdater(
6363
List<ServerNode> tabletServers,
6464
Map<TablePath, TableInfo> tableInfos) {
6565
super(
66-
RpcClient.create(new Configuration(), TestingClientMetricGroup.newInstance()),
66+
RpcClient.create(
67+
new Configuration(), TestingClientMetricGroup.newInstance(), false),
6768
Cluster.empty());
6869
initializeCluster(coordinatorServer, tabletServers, tableInfos);
6970
coordinatorGateway = new TestCoordinatorGateway();

fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ void testGetMetaInfo() throws Exception {
417417
Collections.singleton(DATA1_TABLE_PATH_PK), null, null);
418418

419419
try (RpcClient rpcClient =
420-
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {
420+
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
421421
AdminGateway guestGateway =
422422
GatewayClientProxy.createGatewayProxy(
423423
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),

fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,8 @@ private RecordAccumulator createTestRecordAccumulator(
630630
conf.getInt(ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET),
631631
GatewayClientProxy.createGatewayProxy(
632632
() -> cluster.getRandomTabletServer(),
633-
RpcClient.create(conf, TestingClientMetricGroup.newInstance()),
633+
RpcClient.create(
634+
conf, TestingClientMetricGroup.newInstance(), false),
634635
TabletServerGateway.class)),
635636
TestingWriterMetricGroup.newInstance(),
636637
clock);

fluss-common/src/main/java/com/alibaba/fluss/rpc/messages/ApiMessage.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ default int sizeExcludingZeroCopy() {
6363
*/
6464
boolean isLazilyParsed();
6565

66+
/** Get the parsed byte buffer of the message. */
67+
ByteBuf getParsedByteBuf();
68+
6669
/**
6770
* Deserialize the message from the given byte array.
6871
*

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public void open() {
5555
String clientId = flussConf.getString(ConfigOptions.CLIENT_ID);
5656
MetricRegistry metricRegistry = MetricRegistry.create(flussConf, null);
5757
// don't care about metrics, but pass a ClientMetricGroup to make compiler happy
58-
rpcClient = RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId));
58+
rpcClient =
59+
RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId), false);
5960
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient);
6061
this.coordinatorGateway =
6162
GatewayClientProxy.createGatewayProxy(

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void start() {
126126
FlinkMetricRegistry metricRegistry = new FlinkMetricRegistry(enumeratorMetricGroup);
127127
ClientMetricGroup clientMetricGroup =
128128
new ClientMetricGroup(metricRegistry, "LakeTieringService");
129-
this.rpcClient = RpcClient.create(flussConf, clientMetricGroup);
129+
this.rpcClient = RpcClient.create(flussConf, clientMetricGroup, false);
130130
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient);
131131
this.coordinatorGateway =
132132
GatewayClientProxy.createGatewayProxy(

0 commit comments

Comments
 (0)