Skip to content

[feat] [broker] PIP-188: Support option to disconnect clients that not support cluster migration feature #20084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ isAllowAutoUpdateSchemaEnabled=true
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

# Minimum client version allowed by broker else broker will reject connection.
# (It's useful when client lib doesn't support specific feature and feature
# might be required by broker to apply globally on all topics.
# (eg: all clients must be on V20 to perform cloud migration)
clientMinVersionAllowed=-1

# Path for the file used to determine the rotation status for the broker when responding
# to service discovery health checks
statusFilePath=
Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ isAllowAutoUpdateSchemaEnabled=true
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

# Minimum client version allowed by broker else broker will reject connection.
# (It's useful when client lib doesn't support specific feature and feature
# might be required by broker to apply globally on all topics.
# (eg: all clients must be on V20 to perform cloud migration)
clientMinVersionAllowed=-1

# Path for the file used to determine the rotation status for the broker when responding
# to service discovery health checks
statusFilePath=/usr/local/apache/htdocs
Expand Down
6 changes: 6 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ maxTopicsPerNamespace=0
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

# Minimum client version allowed by broker else broker will reject connection.
# (It's useful when client lib doesn't support specific feature and feature
# might be required by broker to apply globally on all topics.
# (eg: all clients must be on V20 to perform cloud migration)
clientMinVersionAllowed=-1

# Path for the file used to determine the rotation status for the broker when responding
# to service discovery health checks
statusFilePath=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,17 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
doc = "Enable check for minimum allowed client library version"
)
private boolean clientLibraryVersionCheckEnabled = false;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Minimum client version allowed by broker else broker will reject connection."
+ "(It's useful when client lib doesn't support specific feature and feature "
+ "might be required by broker to apply globally on all topics."
+ "(eg: all clients must be on V20 to perform cloud migration)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's an int, should we just say 20, not V20?

)
private int clientMinVersionAllowed = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add proto or protocol to the name to disambiguate it from the library version?


@FieldContext(
category = CATEGORY_SERVER,
doc = "Path for the file used to determine the rotation status for the broker"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final TopicListService topicListService;
private final BrokerInterceptor brokerInterceptor;
private State state;
private int clientMinVersionAllowed;
private volatile boolean isActive = true;
private String authRole = null;
private volatile AuthenticationDataSource authenticationData;
Expand Down Expand Up @@ -307,6 +308,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.topicListService = new TopicListService(pulsar, this,
enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength);
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
this.clientMinVersionAllowed = conf.getClientMinVersionAllowed() > 0 ? conf.getClientMinVersionAllowed() : -1;
}

@Override
Expand Down Expand Up @@ -684,6 +686,14 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {

// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion) {
if (clientMinVersionAllowed >= 0 && clientMinVersionAllowed > clientProtoVersion) {
log.info("[{}] client with version {} must be upgraded to {}", remoteAddress, clientProtoVersion,
clientMinVersionAllowed);
final ByteBuf msg = Commands.newError(-1, ServerError.UnsupportedVersionError,
"Upgrade version to " + clientMinVersionAllowed + " or higher");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about 'upgrade your client to a version that supports protocol version ...'
Otherwise it seems that you have to upgrade to Pulsar 20

NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
return;
}
if (service.isAuthenticationEnabled()) {
if (service.isAuthorizationEnabled()) {
if (!service.getAuthorizationService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@

import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -42,11 +43,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
Expand All @@ -64,6 +61,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
Expand All @@ -76,14 +74,13 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
Expand All @@ -93,6 +90,7 @@
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
Expand Down Expand Up @@ -149,6 +147,12 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

@SuppressWarnings("unchecked")
@Test(groups = "broker")
public class ServerCnxTest {
Expand Down Expand Up @@ -251,6 +255,27 @@ public void testConnectCommand() throws Exception {
channel.finish();
}

@Test(timeOut = 30000)
public void testClientMinVersionSupport() throws Exception {
int clientMinVersion = 10;
int clientVersion = 9;
pulsar.getConfiguration().setClientMinVersionAllowed(clientMinVersion);
resetChannel();
setConnectionVersion(clientVersion);
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect("none", null, clientVersion, "", "", "", null, null, null);
channel.writeInbound(clientCommand);

assertNotEquals(serverCnx.getState(), State.Connected);
Object error = getResponse();
assertTrue(error instanceof CommandError);
assertTrue(((CommandError) error).getError().equals(ServerError.UnsupportedVersionError));
channel.finish();
}

private static ByteBuf newConnect(AuthMethod authMethod, String authData, int protocolVersion) {
BaseCommand cmd = new BaseCommand().setType(Type.CONNECT);
cmd.setConnect()
Expand Down
Loading