Skip to content

feat: add Buf Schema Registry (BSR) support for Protobuf deserialization#2585

Open
sbhuiyan-buf wants to merge 4 commits intotchiotludo:devfrom
sbhuiyan-buf:feature/bsr-schema-registry-integration
Open

feat: add Buf Schema Registry (BSR) support for Protobuf deserialization#2585
sbhuiyan-buf wants to merge 4 commits intotchiotludo:devfrom
sbhuiyan-buf:feature/bsr-schema-registry-integration

Conversation

@sbhuiyan-buf
Copy link

Add support for Buf Schema Registry (BSR) as a new schema registry type, enabling AKHQ to deserialize Protobuf messages using schemas stored in BSR.

This implementation uses the BSR Kafka Serde library to fetch Protobuf descriptors from BSR dynamically, supporting both header-based schema resolution as well as topic mappings.

Key Features

  • New schema registry type "bsr" alongside Confluent, Tibco, and AWS Glue
  • BufSchemaRegistryClient wrapper for BSR API interaction via reflection
  • BufProtobufToJsonDeserializer for BSR-based Protobuf deserialization
  • Support for header-based schema resolution (buf.registry.*.schema.commit/message)
  • Support for config-based schema resolution via topics-mapping
  • Descriptor caching for improved performance

Configuration

Add BSR configuration to connection schema-registry section:

schema-registry:
  type: "bsr"
  bsr-host: "buf.build"  # or "bufbuild.internal" for enterprise
  bsr-token: "your-api-token"  # optional for public modules

Configure topic mappings for BSR deserialization:

protobuf:
  topics-mapping:
    - topic-regex: "orders.*"
      bsr-commit: "a1b2c3d4..."  
      bsr-message-type: "com.myorg.Order"

Implementation Details

  • Uses build.buf.bsr.kafka:bsr-kafka-serde:0.1.1 dependency
  • BufSchemaRegistryClient uses reflection to access package-private BSR Client
  • SSL certificate validation uses standard JVM truststore configuration
  • Integrates with existing Record and RecordRepository classes
  • Falls back gracefully when BSR is not configured

This PR adds support for using Buf Schema Registry (BSR) as a schema
registry backend in AKHQ, enabling Protobuf message deserialization
using schemas stored in BSR.

Key features:
- Support for BSR as a schema registry type alongside Confluent
- Header-based schema resolution (standard BSR approach)
- Topic mapping fallback for messages without BSR headers
- Dynamic descriptor fetching from BSR at runtime
- Automatic JSON conversion for display in AKHQ UI

Implementation details:
- Added BufSchemaRegistryClient wrapper using reflection to access
  build.buf.bsr.kafka.Client
- Created BufProtobufToJsonDeserializer for BSR-specific deserialization
- Extended TopicsMapping config to support BSR commit and message type
- Updated ProtobufToJsonDeserializer to skip BSR-only mappings
- Added BSR configuration options (bsr-host, bsr-token)

Configuration example:
  schema-registry:
    type: bsr
    bsr-host: buf.build
    bsr-token: your-token
  deserialization:
    protobuf:
      topics-mapping:
        - topic-regex: .*
          bsr-commit: latest
          bsr-message-type: your.package.MessageType
@AlexisSouquiere
Copy link
Collaborator

Can you add a page in the documentation like https://akhq.io/docs/configuration/schema-registry/glue.html to explain how to use it, what are the supported features, etc. ?

@sbhuiyan-buf
Copy link
Author

added a docs page

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for Buf Schema Registry (BSR) as a new schema registry type for AKHQ, enabling Protobuf message deserialization using schemas stored in BSR. The implementation includes a standalone BSR client, a BSR-specific deserializer, and configuration updates to support both header-based and topic mapping-based schema resolution.

Key Changes:

  • New BSR schema registry type alongside Confluent, Tibco, and AWS Glue
  • BufSchemaRegistryClient for fetching Protobuf descriptors from BSR via HTTP API
  • BufProtobufToJsonDeserializer supporting header-based and topic mapping fallback for schema resolution
  • Integration with existing Record deserialization flow

Reviewed changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
src/main/java/org/akhq/modules/schemaregistry/BufSchemaRegistryClient.java New standalone BSR client for fetching Protobuf descriptors via HTTP API with caching
src/main/java/org/akhq/utils/BufProtobufToJsonDeserializer.java New deserializer for BSR-based Protobuf messages with header and topic mapping support
src/main/java/org/akhq/modules/KafkaModule.java Added BSR client initialization and null returns for Confluent-specific clients when BSR is configured
src/main/java/org/akhq/repositories/CustomDeserializerRepository.java Added BSR deserializer factory method
src/main/java/org/akhq/repositories/RecordRepository.java Integrated BSR deserializer into record creation
src/main/java/org/akhq/models/Record.java Added BSR deserializer field and deserialization logic to convertToString method
src/main/java/org/akhq/configs/TopicsMapping.java Added BSR-specific fields for commit ID and message type
src/main/java/org/akhq/configs/SchemaRegistryType.java Added BSR enum value
src/main/java/org/akhq/configs/Connection.java Added BSR configuration fields (bsr-host and bsr-token)
src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java Added logic to skip BSR-only mappings during descriptor building
docs/docs/configuration/schema-registry/bsr.md New documentation for BSR configuration
application.example.yml Example configuration for BSR schema registry and topic mappings
build.gradle Added bsr-kafka-serde dependency

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +86 to +108
// Check cache first
Descriptors.Descriptor cached = descriptorCache.get(cacheKey);
if (cached != null) {
log.debug("Using cached descriptor for {} (commit: {})", messageFQN, commitId);
return cached;
}

log.debug("Fetching descriptor from BSR - commit: {}, message: {}", commitId, messageFQN);

try {
Descriptors.Descriptor descriptor = fetchDescriptorFromBSR(commitId, messageFQN);
descriptorCache.put(cacheKey, descriptor);
log.debug("Successfully retrieved and cached descriptor for {}", messageFQN);
return descriptor;
} catch (Exception e) {
String errorMsg = String.format(
"Failed to retrieve descriptor from BSR - commit: %s, message: %s",
commitId, messageFQN);
log.error(errorMsg, e);
throw new SerializationException(errorMsg, e);
}
}

Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition in the descriptor cache. If multiple threads request the same descriptor simultaneously before it's cached, they may all fetch it from BSR concurrently, causing redundant HTTP requests. Consider using ConcurrentHashMap.computeIfAbsent() to ensure only one thread fetches a descriptor for a given cache key, or implement a pattern with synchronized blocks or a loading cache to prevent duplicate fetches.

Suggested change
// Check cache first
Descriptors.Descriptor cached = descriptorCache.get(cacheKey);
if (cached != null) {
log.debug("Using cached descriptor for {} (commit: {})", messageFQN, commitId);
return cached;
}
log.debug("Fetching descriptor from BSR - commit: {}, message: {}", commitId, messageFQN);
try {
Descriptors.Descriptor descriptor = fetchDescriptorFromBSR(commitId, messageFQN);
descriptorCache.put(cacheKey, descriptor);
log.debug("Successfully retrieved and cached descriptor for {}", messageFQN);
return descriptor;
} catch (Exception e) {
String errorMsg = String.format(
"Failed to retrieve descriptor from BSR - commit: %s, message: %s",
commitId, messageFQN);
log.error(errorMsg, e);
throw new SerializationException(errorMsg, e);
}
}
try {
Descriptors.Descriptor descriptor = descriptorCache.computeIfAbsent(cacheKey, key -> {
log.debug("Fetching descriptor from BSR - commit: {}, message: {}", commitId, messageFQN);
try {
Descriptors.Descriptor fetched = fetchDescriptorFromBSR(commitId, messageFQN);
log.debug("Successfully retrieved and cached descriptor for {}", messageFQN);
return fetched;
} catch (IOException e) {
throw new DescriptorFetchRuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new DescriptorFetchRuntimeException(e);
}
});
log.debug("Using cached descriptor for {} (commit: {})", messageFQN, commitId);
return descriptor;
} catch (DescriptorFetchRuntimeException e) {
Throwable cause = e.getCause();
String errorMsg = String.format(
"Failed to retrieve descriptor from BSR - commit: %s, message: %s",
commitId, messageFQN);
log.error(errorMsg, cause != null ? cause : e);
throw new SerializationException(errorMsg, cause != null ? cause : e);
}
}
/**
* Wrapper runtime exception used to propagate checked exceptions from descriptor fetch
* operations executed inside computeIfAbsent.
*/
private static class DescriptorFetchRuntimeException extends RuntimeException {
DescriptorFetchRuntimeException(Throwable cause) {
super(cause);
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +21 to +116
public class BufProtobufToJsonDeserializer {
private final BufSchemaRegistryClient bsrClient;
private final List<TopicsMapping> topicsMapping;

public static final String HEADER_BSR_COMMIT = "buf.registry.value.schema.commit";
public static final String HEADER_BSR_MESSAGE = "buf.registry.value.schema.message";

public BufProtobufToJsonDeserializer(
BufSchemaRegistryClient bsrClient,
ProtobufDeserializationTopicsMapping config) {
this.bsrClient = bsrClient;
this.topicsMapping = config != null ? config.getTopicsMapping() : new ArrayList<>();
}

/**
* Deserialize protobuf binary data using BSR.
* Prefers headers, falls back to topic mapping configuration.
*
* @param topic Topic name
* @param buffer Binary protobuf data
* @param isKey Is this a key or value
* @param headers Kafka record headers (may contain BSR schema info)
* @return JSON string or null if not configured
*/
public String deserialize(String topic, byte[] buffer, boolean isKey,
List<KeyValue<String, String>> headers) {
if (buffer == null) {
log.debug("Buffer is null for topic: {}", topic);
return null;
}

log.debug("BSR deserialize called - topic: {}, isKey: {}, bufferSize: {}",
topic, isKey, buffer.length);

// Prefer headers (standard BSR approach)
String commitFromHeader = getHeaderValue(headers, HEADER_BSR_COMMIT);
String messageFromHeader = getHeaderValue(headers, HEADER_BSR_MESSAGE);

if (commitFromHeader != null && messageFromHeader != null) {
log.debug("Using BSR schema from headers - commit: {}, message: {}", commitFromHeader, messageFromHeader);
return deserializeWithBSR(commitFromHeader, messageFromHeader, buffer);
}

// Fall back to topic mapping if configured
TopicsMapping mapping = findMatchingMapping(topic);
if (mapping != null && mapping.getBsrCommit() != null && mapping.getBsrMessageType() != null) {
log.debug("Using BSR schema from topic mapping - topic: {}, commit: {}, message: {}",
topic, mapping.getBsrCommit(), mapping.getBsrMessageType());
return deserializeWithBSR(mapping.getBsrCommit(), mapping.getBsrMessageType(), buffer);
}

log.debug("No BSR configuration found for topic [{}]", topic);
return null;
}

private String deserializeWithBSR(String commit, String messageFQN, byte[] buffer) {
log.debug("Deserializing with BSR - commit: {}, message: {}, bufferSize: {}",
commit, messageFQN, buffer.length);

try {
// Step 1: Fetch descriptor from BSR (via HTTP)
Descriptors.Descriptor descriptor = bsrClient.getMessageDescriptor(commit, messageFQN);

// Step 2: Parse protobuf bytes directly using the descriptor
DynamicMessage message = DynamicMessage.parseFrom(descriptor, buffer);

// Step 3: Convert to JSON
String json = JsonFormat.printer().print(message);
log.debug("Successfully deserialized and converted to JSON");
return json;
} catch (Exception e) {
log.error("Failed to deserialize with BSR - commit: {}, message: {}", commit, messageFQN, e);
throw new SerializationException(
String.format("Failed to deserialize with BSR [commit=%s, message=%s]: %s",
commit, messageFQN, e.getMessage()), e);
}
}

private TopicsMapping findMatchingMapping(String topic) {
return topicsMapping.stream()
.filter(mapping -> topic.matches(mapping.getTopicRegex()))
.findFirst()
.orElse(null);
}

private String getHeaderValue(List<KeyValue<String, String>> headers, String key) {
if (headers == null) {
return null;
}
return headers.stream()
.filter(h -> key.equals(h.getKey()))
.map(KeyValue::getValue)
.findFirst()
.orElse(null);
}
}
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new BufProtobufToJsonDeserializer class lacks test coverage. Given that other deserializers in the codebase have comprehensive tests (e.g., ProtobufToJsonDeserializerTest, AvroToJsonDeserializerTest), this class should also have tests to verify its functionality, including header-based deserialization, topic mapping fallback, error handling, and the distinction between key and value deserialization.

Copilot uses AI. Check for mistakes.
Comment on lines +31 to +239
public class BufSchemaRegistryClient {
private static final String HEADER_AUTHORIZATION = "Authorization";
private static final String HEADER_CONTENT_TYPE = "Content-Type";
private static final String HEADER_CONNECT_PROTOCOL_VERSION = "Connect-Protocol-Version";
private static final String HEADER_CONNECT_TIMEOUT_MS = "Connect-Timeout-Ms";
private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
private static final String HEADER_USER_AGENT = "User-Agent";
private static final String BEARER_PREFIX = "Bearer ";
private static final String METHOD_GET_FILE_DESCRIPTOR_SET =
"buf.registry.module.v1.FileDescriptorSetService/GetFileDescriptorSet";
private static final String USER_AGENT = "akhq/bsr-client";
private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30);

private final String host;
private final String token;
private final HttpClient httpClient;
private final ConcurrentMap<String, Descriptors.Descriptor> descriptorCache = new ConcurrentHashMap<>();

/**
* Creates a new BSR client.
*
* @param host BSR hostname (e.g., "buf.build", "bufbuild.internal")
* @param token API token for authentication (optional for public modules)
*/
public BufSchemaRegistryClient(String host, String token) {
if (host == null || host.isEmpty()) {
throw new IllegalArgumentException("BSR host cannot be null or empty");
}
this.host = host;
this.token = token;
this.httpClient = HttpClient.newBuilder().build();

log.info("Initialized standalone BSR client for host: {}", host);
}

/**
* Get a message descriptor from BSR by commit ID and message fully-qualified name.
* Results are cached locally.
*
* @param commitId BSR commit ID (e.g., "a1b2c3d4...")
* @param messageFQN Fully-qualified protobuf message name (e.g., "com.myorg.Order")
* @return Protobuf descriptor for the message
* @throws SerializationException if the descriptor cannot be retrieved
*/
public Descriptors.Descriptor getMessageDescriptor(String commitId, String messageFQN)
throws SerializationException {
if (commitId == null || commitId.isEmpty()) {
throw new IllegalArgumentException("Commit ID cannot be null or empty");
}
if (messageFQN == null || messageFQN.isEmpty()) {
throw new IllegalArgumentException("Message FQN cannot be null or empty");
}

String cacheKey = commitId + ":" + messageFQN;

// Check cache first
Descriptors.Descriptor cached = descriptorCache.get(cacheKey);
if (cached != null) {
log.debug("Using cached descriptor for {} (commit: {})", messageFQN, commitId);
return cached;
}

log.debug("Fetching descriptor from BSR - commit: {}, message: {}", commitId, messageFQN);

try {
Descriptors.Descriptor descriptor = fetchDescriptorFromBSR(commitId, messageFQN);
descriptorCache.put(cacheKey, descriptor);
log.debug("Successfully retrieved and cached descriptor for {}", messageFQN);
return descriptor;
} catch (Exception e) {
String errorMsg = String.format(
"Failed to retrieve descriptor from BSR - commit: %s, message: %s",
commitId, messageFQN);
log.error(errorMsg, e);
throw new SerializationException(errorMsg, e);
}
}

private Descriptors.Descriptor fetchDescriptorFromBSR(String commitId, String messageFQN)
throws IOException, InterruptedException {
// Build the BSR API request
GetFileDescriptorSetRequest request = GetFileDescriptorSetRequest.newBuilder()
.setResourceRef(ResourceRef.newBuilder().setId(commitId).build())
.addIncludeTypes(messageFQN)
.build();

// Build HTTP request
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create("https://" + host + "/" + METHOD_GET_FILE_DESCRIPTOR_SET))
.header(HEADER_CONTENT_TYPE, "application/proto")
.header(HEADER_CONNECT_PROTOCOL_VERSION, "1")
.header(HEADER_ACCEPT_ENCODING, "identity")
.header(HEADER_USER_AGENT, USER_AGENT)
.header(HEADER_CONNECT_TIMEOUT_MS, "30000")
.timeout(DEFAULT_REQUEST_TIMEOUT)
.POST(HttpRequest.BodyPublishers.ofByteArray(request.toByteArray()));

if (token != null && !token.isEmpty()) {
requestBuilder.header(HEADER_AUTHORIZATION, BEARER_PREFIX + token);
}

HttpRequest httpRequest = requestBuilder.build();

// Send request
HttpResponse<byte[]> response = httpClient.send(
httpRequest,
HttpResponse.BodyHandlers.ofByteArray());

if (response.statusCode() != 200) {
throw new IOException(String.format(
"BSR API request failed: HTTP %d - %s",
response.statusCode(),
new String(response.body(), StandardCharsets.UTF_8)));
}

// Parse response
GetFileDescriptorSetResponse bsrResponse = GetFileDescriptorSetResponse.parseFrom(response.body());

// Extract the descriptor
Descriptors.Descriptor descriptor = findMessageDescriptor(
bsrResponse.getFileDescriptorSet(),
messageFQN);

if (descriptor == null) {
throw new IOException("Failed to find message descriptor for " + messageFQN);
}

return descriptor;
}

/**
* Finds a message descriptor within a FileDescriptorSet.
* Adapted from bsr-kafka-serde library.
*/
private static Descriptors.Descriptor findMessageDescriptor(
DescriptorProtos.FileDescriptorSet fds,
String messageFQN) {
Map<String, Descriptors.FileDescriptor> descriptorsByName = new HashMap<>(fds.getFileCount());

// Build all file descriptors
for (DescriptorProtos.FileDescriptorProto fd : fds.getFileList()) {
try {
buildFileDescriptor(fd, fds, descriptorsByName);
} catch (Descriptors.DescriptorValidationException e) {
throw new SerializationException(
"Failed to build file descriptor for " + fd.getName(), e);
}
}

// Parse message FQN into package and message name
final String packageName, messageName;
int lastDot = messageFQN.lastIndexOf('.');
if (lastDot != -1) {
packageName = messageFQN.substring(0, lastDot);
messageName = messageFQN.substring(lastDot + 1);
} else {
packageName = "";
messageName = messageFQN;
}

// Find the message descriptor
for (Descriptors.FileDescriptor fd : descriptorsByName.values()) {
if (!fd.getPackage().equals(packageName)) {
continue;
}
Descriptors.Descriptor md = fd.findMessageTypeByName(messageName);
if (md != null) {
return md;
}
}

return null;
}

/**
* Recursively builds a FileDescriptor and its dependencies.
* Adapted from bsr-kafka-serde library.
*/
private static Descriptors.FileDescriptor buildFileDescriptor(
DescriptorProtos.FileDescriptorProto fdp,
DescriptorProtos.FileDescriptorSet fds,
Map<String, Descriptors.FileDescriptor> fileDescriptorsByName)
throws Descriptors.DescriptorValidationException {

if (fileDescriptorsByName.containsKey(fdp.getName())) {
return fileDescriptorsByName.get(fdp.getName());
}

List<Descriptors.FileDescriptor> dependencies = new ArrayList<>(fdp.getDependencyCount());
for (String depName : fdp.getDependencyList()) {
Descriptors.FileDescriptor dependency = fileDescriptorsByName.get(depName);
if (dependency != null) {
dependencies.add(dependency);
continue;
}
DescriptorProtos.FileDescriptorProto depProto = fds.getFileList().stream()
.filter(f -> f.getName().equals(depName))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Dependency not found: " + depName));
dependencies.add(buildFileDescriptor(depProto, fds, fileDescriptorsByName));
}

Descriptors.FileDescriptor fd = Descriptors.FileDescriptor.buildFrom(
fdp,
dependencies.toArray(new Descriptors.FileDescriptor[0]));
fileDescriptorsByName.put(fdp.getName(), fd);
return fd;
}
}
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new BufSchemaRegistryClient class lacks test coverage. Given that there are tests for other schema registry-related components in the codebase, this class should have tests to verify its descriptor fetching, caching behavior, error handling, and authentication mechanisms.

Copilot uses AI. Check for mistakes.
AlexisSouquiere and others added 2 commits December 19, 2025 11:54
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

3 participants