Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,9 @@ metadataStoreBatchingMaxSizeKb=128
# Enable authentication
authenticationEnabled=false

# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE
authenticationRoleLoggingAnonymizer=NONE

# Authentication provider name list, which is comma separated list of class names
authenticationProviders=

Expand Down
3 changes: 3 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ forwardAuthorizationCredentials=false
# Whether authentication is enabled for the Pulsar proxy
authenticationEnabled=false

# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE
authenticationRoleLoggingAnonymizer=NONE

# Authentication provider name list (a comma-separated list of class names)
authenticationProviders=

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ authenticateOriginalAuthData=false
# Enable authentication
authenticationEnabled=false

# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE
authenticationRoleLoggingAnonymizer=NONE

# Authentication provider name list, which is comma separated list of class names
authenticationProviders=

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private String clusterName;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Defines how the broker will anonymize the role and originalAuthRole before logging. "
+ "Possible values are: NONE (no anonymization), REDACTED (replaces with '[REDACTED]'), "
+ "hash:SHA256 (hashes using SHA-256), and hash:MD5 (hashes using MD5). Default is NONE."
)
private String authenticationRoleLoggingAnonymizer = "NONE";

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.configuration.anonymizer;

import static org.apache.pulsar.common.configuration.anonymizer.DefaultRoleAnonymizerType.NONE;

/**
* This class provides a utility to anonymize authentication roles before logging,
* based on a configured anonymization strategy. The anonymization strategy is
* determined by the provided value of the {@link DefaultRoleAnonymizerType} enum.
*
* The primary purpose of this class is to enable flexible anonymization of sensitive
* information (such as user roles) in logs, ensuring compliance with security and
* privacy requirements while allowing customization of the anonymization behavior.
*
* Usage:
* - The class constructor accepts a string that represents the desired anonymization
* strategy (e.g., "NONE", "REDACTED", "SHA256", "MD5"), and it initializes the
* anonymizer type accordingly.
* - The {@code anonymize} method applies the selected anonymization strategy to
* the provided role and returns the anonymized value.
*
* Example:
* <pre>
* DefaultAuthenticationRoleLoggingAnonymizer roleAnonymizer =
* new DefaultAuthenticationRoleLoggingAnonymizer("SHA256");
* String anonymizedRole = roleAnonymizer.anonymize("admin");
* </pre>
*
* Anonymization strategies:
* - NONE: No anonymization (returns the role as-is).
* - REDACTED: Replaces the role with "[REDACTED]".
* - hash:SHA256: Hashes the role using the SHA-256 algorithm and prefixes it with "SHA-256:".
* - hash:MD5: Hashes the role using the MD5 algorithm and prefixes it with "MD5:"
*/
public final class DefaultAuthenticationRoleLoggingAnonymizer {
private static DefaultRoleAnonymizerType anonymizerType = NONE;

public DefaultAuthenticationRoleLoggingAnonymizer(String authenticationRoleLoggingAnonymizer) {
if (authenticationRoleLoggingAnonymizer.startsWith("hash:")) {
anonymizerType = DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer
.substring("hash:".length()).toUpperCase());
} else {
anonymizerType = DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer);
}
}

public String anonymize(String role) {
return anonymizerType.anonymize(role);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.configuration.anonymizer;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;

public enum DefaultRoleAnonymizerType {
NONE {
@Override
public String anonymize(String role) {
return role;
}
},
REDACTED {
@Override
public String anonymize(String role) {
return REDACTED_VALUE;
}
},
SHA256 {
private static final String PREFIX = "SHA-256:";
private final MessageDigest digest;

{
// Initializing the MessageDigest once for SHA-256
try {
digest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("SHA-256 algorithm not found", e);
}
}

@Override
public String anonymize(String role) {
byte[] hash = digest.digest(role.getBytes());
return PREFIX + Base64.getEncoder().encodeToString(hash);
}
},
MD5 {
private static final String PREFIX = "MD5:";
private final MessageDigest digest;

{
// Initializing the MessageDigest once for MD5
try {
// codeql[java/weak-cryptographic-algorithm] - md5 is sufficient for this use case&
digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 algorithm not found", e);
}
}

@Override
public String anonymize(String role) {
byte[] hash = digest.digest(role.getBytes());
return PREFIX + Base64.getEncoder().encodeToString(hash);
}
};

private static final String REDACTED_VALUE = "[REDACTED]";
public abstract String anonymize(String role);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Pulsar Client API.
*/
package org.apache.pulsar.common.configuration.anonymizer;

Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.Metadata;
Expand Down Expand Up @@ -215,6 +216,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private AuthData originalAuthDataCopy;
private boolean pendingAuthChallengeResponse = false;
private ScheduledFuture<?> authRefreshTask;
private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer;

// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write spikes on the broker.
Expand Down Expand Up @@ -352,6 +354,8 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
this.throttleTracker = new ServerCnxThrottleTracker(this);
topicsPatternImplementation = conf.getTopicsPatternRegexImplementation();
this.authenticationRoleLoggingAnonymizer = new DefaultAuthenticationRoleLoggingAnonymizer(
conf.getAuthenticationRoleLoggingAnonymizer());
}

@Override
Expand Down Expand Up @@ -821,12 +825,14 @@ private void completeConnect(int clientProtoVersion, String clientVersion) {
clientVersion, clientProtoVersion, proxyVersion);
} else if (originalPrincipal != null) {
log.info("[{}] connected role={} and originalAuthRole={} using authMethod={}, clientVersion={}, "
+ "clientProtocolVersion={}, proxyVersion={}", remoteAddress, authRole, originalPrincipal,
authMethod, clientVersion, clientProtoVersion, proxyVersion);
+ "clientProtocolVersion={}, proxyVersion={}", remoteAddress,
authenticationRoleLoggingAnonymizer.anonymize(authRole),
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), authMethod, clientVersion,
clientProtoVersion, proxyVersion);
} else {
log.info("[{}] connected with role={} using authMethod={}, clientVersion={}, clientProtocolVersion={}, "
+ "proxyVersion={}", remoteAddress, authRole, authMethod, clientVersion, clientProtoVersion,
proxyVersion);
+ "proxyVersion={}", remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(authRole),
authMethod, clientVersion, clientProtoVersion, proxyVersion);
}
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionCreated(this);
Expand Down Expand Up @@ -1214,7 +1220,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {

if (log.isDebugEnabled()) {
log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}",
remoteAddress, authRole, originalPrincipal);
remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(authRole),
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal));
}

final String subscriptionName = subscribe.getSubscription();
Expand Down Expand Up @@ -2433,11 +2440,11 @@ private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName nam
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}",
originalPrincipal, operation, namespaceName);
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), operation, namespaceName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on namespace {}",
authRole, operation, namespaceName);
authenticationRoleLoggingAnonymizer.anonymize(authRole), operation, namespaceName);
}
return isProxyAuthorized && isAuthorized;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ public class ProxyConfiguration implements PulsarConfiguration {
+ "is enabled.")
private Boolean webServiceLogDetailedAddresses;

@FieldContext(category = CATEGORY_SERVER, doc =
"Defines how the broker will anonymize the role and originalAuthRole before logging. "
+ "Possible values are: NONE (no anonymization), REDACTED (replaces with '[REDACTED]'), "
+ "hash:SHA256 (hashes using SHA-256), and hash:MD5 (hashes using MD5). Default is NONE."
)
private String authenticationRoleLoggingAnonymizer = "NONE";

@FieldContext(category = CATEGORY_SERVER,
doc = "Enables zero-copy transport of data across network interfaces using the spice. "
+ "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.Runnables;
Expand Down Expand Up @@ -120,6 +121,7 @@ public class ProxyConnection extends PulsarHandler {
private int protocolVersionToAdvertise;
private String proxyToBrokerUrl;
private HAProxyMessage haProxyMessage;
private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer;

protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024;
private static final byte[] EMPTY_CREDENTIALS = new byte[0];
Expand Down Expand Up @@ -161,6 +163,8 @@ public ProxyConnection(ProxyService proxyService, DnsAddressResolverGroup dnsAdd
this.state = State.Init;
this.brokerProxyValidator = service.getBrokerProxyValidator();
this.connectionController = proxyService.getConnectionController();
this.authenticationRoleLoggingAnonymizer = new DefaultAuthenticationRoleLoggingAnonymizer(
proxyService.getConfiguration().getAuthenticationRoleLoggingAnonymizer());
}

@Override
Expand Down Expand Up @@ -343,16 +347,17 @@ protected static boolean isTlsChannel(Channel channel) {

private synchronized void completeConnect() throws PulsarClientException {
checkArgument(state == State.Connecting);
String maybeAnonymizedClientAuthRole = authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole);
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
remoteAddress, authMethod, maybeAnonymizedClientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
// Optimize proxy connection to fail-fast if the target broker isn't active
// Pulsar client will retry connecting after a back off timeout
if (service.getConfiguration().isCheckActiveBrokers()
&& !isBrokerActive(proxyToBrokerUrl)) {
state = State.Closing;
LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole);
remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole);
final ByteBuf msg = Commands.newError(-1,
ServerError.ServiceNotReady, "Target broker isn't available.");
writeAndFlushAndClose(msg);
Expand All @@ -371,10 +376,11 @@ private synchronized void completeConnect() throws PulsarClientException {

LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(),
authMethod, clientAuthRole);
authMethod, maybeAnonymizedClientAuthRole);
} else {
LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable);
remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole,
throwable);
}
final ByteBuf msg = Commands.newError(-1, ServerError.ServiceNotReady,
"Target broker cannot be validated.");
Expand All @@ -401,7 +407,7 @@ private synchronized void completeConnect() throws PulsarClientException {
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null);
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
remoteAddress, state, clientAuthRole);
remoteAddress, state, maybeAnonymizedClientAuthRole);
}

state = State.ProxyLookupRequests;
Expand Down Expand Up @@ -488,7 +494,7 @@ protected void authChallengeSuccessCallback(AuthData authChallenge) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
remoteAddress, authMethod, authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole));
}

// First connection
Expand Down
Loading