Skip to content

Commit 7c76147

Browse files
committed
KAFKA-20639: Move EnvelopeUtils to server module
1 parent f870500 commit 7c76147

5 files changed

Lines changed: 376 additions & 145 deletions

File tree

core/src/main/scala/kafka/server/ControllerApis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply,
5555
import org.apache.kafka.network.Request
5656
import org.apache.kafka.raft.RaftManager
5757
import org.apache.kafka.security.DelegationTokenManager
58-
import org.apache.kafka.server.{ApiVersionManager, ProcessRole}
58+
import org.apache.kafka.server.{ApiVersionManager, EnvelopeUtils, ProcessRole}
5959
import org.apache.kafka.server.authorizer.Authorizer
6060
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
6161
import org.apache.kafka.server.quota.ControllerMutationQuota

core/src/main/scala/kafka/server/EnvelopeUtils.scala

Lines changed: 0 additions & 143 deletions
This file was deleted.

core/src/test/scala/unit/kafka/network/RequestChannelTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package kafka.network
1919

2020
import com.fasterxml.jackson.databind.ObjectMapper
21-
import kafka.server.EnvelopeUtils
2221
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
2322
import org.apache.kafka.common.config.types.Password
2423
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig}
@@ -35,6 +34,7 @@ import org.apache.kafka.common.utils.Utils
3534
import org.apache.kafka.common.utils.internals.SecurityUtils
3635
import org.apache.kafka.network.{Request, RequestConvertToJson}
3736
import org.apache.kafka.network.metrics.RequestChannelMetrics
37+
import org.apache.kafka.server.EnvelopeUtils
3838
import org.junit.jupiter.api.Assertions._
3939
import org.junit.jupiter.api._
4040
import org.junit.jupiter.params.ParameterizedTest
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server;
18+
19+
import org.apache.kafka.common.errors.InvalidRequestException;
20+
import org.apache.kafka.common.errors.PrincipalDeserializationException;
21+
import org.apache.kafka.common.errors.UnsupportedVersionException;
22+
import org.apache.kafka.common.network.ClientInformation;
23+
import org.apache.kafka.common.protocol.ApiKeys;
24+
import org.apache.kafka.common.requests.EnvelopeRequest;
25+
import org.apache.kafka.common.requests.RequestContext;
26+
import org.apache.kafka.common.requests.RequestHeader;
27+
import org.apache.kafka.common.security.auth.KafkaPrincipal;
28+
import org.apache.kafka.network.Request;
29+
import org.apache.kafka.network.metrics.RequestChannelMetrics;
30+
31+
import java.net.InetAddress;
32+
import java.net.UnknownHostException;
33+
import java.nio.ByteBuffer;
34+
import java.util.Optional;
35+
import java.util.function.Consumer;
36+
37+
public final class EnvelopeUtils {
38+
private EnvelopeUtils() {
39+
}
40+
41+
public static void handleEnvelopeRequest(
42+
Request request,
43+
RequestChannelMetrics requestChannelMetrics,
44+
Consumer<Request> handler
45+
) {
46+
EnvelopeRequest envelope = request.body(EnvelopeRequest.class);
47+
KafkaPrincipal forwardedPrincipal = parseForwardedPrincipal(request.context(), envelope.requestPrincipal());
48+
InetAddress forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress());
49+
50+
ByteBuffer forwardedRequestBuffer = envelope.requestData().duplicate();
51+
RequestHeader forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer);
52+
53+
ApiKeys forwardedApi = forwardedRequestHeader.apiKey();
54+
if (!forwardedApi.forwardable) {
55+
throw new InvalidRequestException("API " + forwardedApi + " is not enabled or is not eligible for forwarding");
56+
}
57+
58+
RequestContext forwardedContext = new RequestContext(
59+
forwardedRequestHeader,
60+
request.context().connectionId,
61+
forwardedClientAddress,
62+
forwardedPrincipal,
63+
request.context().listenerName,
64+
request.context().securityProtocol,
65+
ClientInformation.EMPTY,
66+
request.context().fromPrivilegedListener
67+
);
68+
69+
Request forwardedRequest = parseForwardedRequest(
70+
request,
71+
forwardedContext,
72+
forwardedRequestBuffer,
73+
requestChannelMetrics
74+
);
75+
handler.accept(forwardedRequest);
76+
}
77+
78+
private static InetAddress parseForwardedClientAddress(byte[] address) {
79+
try {
80+
return InetAddress.getByAddress(address);
81+
} catch (UnknownHostException e) {
82+
throw new InvalidRequestException("Failed to parse client address from envelope", e);
83+
}
84+
}
85+
86+
private static Request parseForwardedRequest(
87+
Request envelope,
88+
RequestContext forwardedContext,
89+
ByteBuffer buffer,
90+
RequestChannelMetrics requestChannelMetrics
91+
) {
92+
try {
93+
Request forwardedRequest = new Request(
94+
envelope.processor(),
95+
forwardedContext,
96+
envelope.startTimeNanos(),
97+
envelope.memoryPool(),
98+
buffer,
99+
requestChannelMetrics,
100+
Optional.of(envelope)
101+
);
102+
// set the dequeue time of forwardedRequest as the value of envelope request
103+
forwardedRequest.requestDequeueTimeNanos(envelope.requestDequeueTimeNanos());
104+
return forwardedRequest;
105+
} catch (InvalidRequestException e) {
106+
// We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
107+
// The purpose is to disambiguate structural errors in the envelope request
108+
// itself, such as an invalid client address.
109+
throw new UnsupportedVersionException("Failed to parse forwarded request with header " + forwardedContext.header, e);
110+
}
111+
}
112+
113+
private static RequestHeader parseForwardedRequestHeader(ByteBuffer buffer) {
114+
try {
115+
return RequestHeader.parse(buffer);
116+
} catch (InvalidRequestException e) {
117+
// We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
118+
// The purpose is to disambiguate structural errors in the envelope request
119+
// itself, such as an invalid client address.
120+
throw new UnsupportedVersionException("Failed to parse request header from envelope", e);
121+
}
122+
}
123+
124+
private static KafkaPrincipal parseForwardedPrincipal(
125+
RequestContext envelopeContext,
126+
byte[] principalBytes
127+
) {
128+
if (envelopeContext.principalSerde.isEmpty()) {
129+
throw new PrincipalDeserializationException("Could not deserialize principal since " +
130+
"no `KafkaPrincipalSerde` has been defined");
131+
}
132+
133+
try {
134+
return envelopeContext.principalSerde.get().deserialize(principalBytes);
135+
} catch (Exception e) {
136+
throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e);
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)