Skip to content

Commit 476c3ce

Browse files
authored
[server] Zero-Copy for follower fetching logs from leader (apache#1186)
1 parent 8631efe commit 476c3ce

File tree

48 files changed

+415
-168
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+415
-168
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/FlussConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public final class FlussConnection implements Connection {
7979
String clientId = conf.getString(ConfigOptions.CLIENT_ID);
8080
this.metricRegistry = metricRegistry;
8181
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, clientId);
82-
this.rpcClient = RpcClient.create(conf, clientMetricGroup);
82+
this.rpcClient = RpcClient.create(conf, clientMetricGroup, false);
8383

8484
// TODO this maybe remove after we introduce client metadata.
8585
this.metadataUpdater = new MetadataUpdater(conf, rpcClient);

fluss-client/src/test/java/com/alibaba/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ private TestingMetadataUpdater(
6363
List<ServerNode> tabletServers,
6464
Map<TablePath, TableInfo> tableInfos) {
6565
super(
66-
RpcClient.create(new Configuration(), TestingClientMetricGroup.newInstance()),
66+
RpcClient.create(
67+
new Configuration(), TestingClientMetricGroup.newInstance(), false),
6768
Cluster.empty());
6869
initializeCluster(coordinatorServer, tabletServers, tableInfos);
6970
coordinatorGateway = new TestCoordinatorGateway();

fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ void testGetMetaInfo() throws Exception {
417417
Collections.singleton(DATA1_TABLE_PATH_PK), null, null);
418418

419419
try (RpcClient rpcClient =
420-
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {
420+
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance(), false)) {
421421
AdminGateway guestGateway =
422422
GatewayClientProxy.createGatewayProxy(
423423
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),

fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,8 @@ private RecordAccumulator createTestRecordAccumulator(
630630
conf.getInt(ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET),
631631
GatewayClientProxy.createGatewayProxy(
632632
() -> cluster.getRandomTabletServer(),
633-
RpcClient.create(conf, TestingClientMetricGroup.newInstance()),
633+
RpcClient.create(
634+
conf, TestingClientMetricGroup.newInstance(), false),
634635
TabletServerGateway.class)),
635636
TestingWriterMetricGroup.newInstance(),
636637
clock);

fluss-common/src/main/java/com/alibaba/fluss/rpc/messages/ApiMessage.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ default int sizeExcludingZeroCopy() {
6363
*/
6464
boolean isLazilyParsed();
6565

66+
/** Get the parsed byte buffer of the message. */
67+
ByteBuf getParsedByteBuf();
68+
6669
/**
6770
* Deserialize the message from the given byte array.
6871
*

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public void open() {
5555
String clientId = flussConf.getString(ConfigOptions.CLIENT_ID);
5656
MetricRegistry metricRegistry = MetricRegistry.create(flussConf, null);
5757
// don't care about metrics, but pass a ClientMetricGroup to make compiler happy
58-
rpcClient = RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId));
58+
rpcClient =
59+
RpcClient.create(flussConf, new ClientMetricGroup(metricRegistry, clientId), false);
5960
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient);
6061
this.coordinatorGateway =
6162
GatewayClientProxy.createGatewayProxy(

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void start() {
126126
FlinkMetricRegistry metricRegistry = new FlinkMetricRegistry(enumeratorMetricGroup);
127127
ClientMetricGroup clientMetricGroup =
128128
new ClientMetricGroup(metricRegistry, "LakeTieringService");
129-
this.rpcClient = RpcClient.create(flussConf, clientMetricGroup);
129+
this.rpcClient = RpcClient.create(flussConf, clientMetricGroup, false);
130130
MetadataUpdater metadataUpdater = new MetadataUpdater(flussConf, rpcClient);
131131
this.coordinatorGateway =
132132
GatewayClientProxy.createGatewayProxy(

fluss-protogen/fluss-protogen-generator/src/main/java/com/alibaba/fluss/protogen/generator/generator/ProtobufMessage.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public void generate(PrintWriter w) {
9999
generateZeroCopySize(w);
100100
generateParseFrom(w);
101101
generateIsLazilyParsed(w);
102+
generateGetParsedByteBuf(w);
102103
generateCheckRequiredFields(w);
103104
generateClear(w);
104105
generateCopyFrom(w);
@@ -175,6 +176,14 @@ private void generateIsLazilyParsed(PrintWriter w) {
175176
w.format(" }\n");
176177
}
177178

179+
private void generateGetParsedByteBuf(PrintWriter w) {
180+
w.println();
181+
w.println(" @Override");
182+
w.format(" public ByteBuf getParsedByteBuf() {\n");
183+
w.format(" return _parsedBuffer;\n");
184+
w.format(" }\n");
185+
}
186+
178187
private void generateClear(PrintWriter w) {
179188
w.println();
180189
w.format(" public %s clear() {\n", message.getName());

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/RpcClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,13 @@ public interface RpcClient extends AutoCloseable {
3737
*
3838
* @param conf The configuration to use.
3939
* @param clientMetricGroup The client metric group
40+
* @param isInnerClient Whether it is an inner client used for communicate from server to
41+
* server.
4042
* @return The RPC client.
4143
*/
42-
static RpcClient create(Configuration conf, ClientMetricGroup clientMetricGroup) {
43-
return new NettyClient(conf, clientMetricGroup);
44+
static RpcClient create(
45+
Configuration conf, ClientMetricGroup clientMetricGroup, boolean isInnerClient) {
46+
return new NettyClient(conf, clientMetricGroup, isInnerClient);
4447
}
4548

4649
/**

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/NettyClient.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,16 @@ public final class NettyClient implements RpcClient {
7676

7777
private final Supplier<ClientAuthenticator> authenticatorSupplier;
7878

79+
/**
80+
* Whether the NettyClient is used as inner network client (Communicating between Fluss's
81+
* servers).
82+
*/
83+
private final boolean isInnerClient;
84+
7985
private volatile boolean isClosed = false;
8086

81-
public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) {
87+
public NettyClient(
88+
Configuration conf, ClientMetricGroup clientMetricGroup, boolean isInnerClient) {
8289
this.connections = MapUtils.newConcurrentHashMap();
8390

8491
// build bootstrap
@@ -99,6 +106,7 @@ public NettyClient(Configuration conf, ClientMetricGroup clientMetricGroup) {
99106
.option(ChannelOption.TCP_NODELAY, true)
100107
.option(ChannelOption.SO_KEEPALIVE, true)
101108
.handler(new ClientChannelInitializer(connectionMaxIdle));
109+
this.isInnerClient = isInnerClient;
102110
this.clientMetricGroup = clientMetricGroup;
103111
this.authenticatorSupplier = AuthenticationFactory.loadClientAuthenticatorSupplier(conf);
104112
NettyMetrics.registerNettyMetrics(clientMetricGroup, pooledAllocator);
@@ -189,7 +197,8 @@ private ServerConnection getOrCreateConnection(ServerNode node) {
189197
bootstrap,
190198
node,
191199
clientMetricGroup,
192-
authenticatorSupplier.get());
200+
authenticatorSupplier.get(),
201+
isInnerClient);
193202
connection.whenClose(ignore -> connections.remove(serverId, connection));
194203
return connection;
195204
});

0 commit comments

Comments
 (0)