diff --git a/application.example.yml b/application.example.yml index 62c625cc2..220d18f7b 100644 --- a/application.example.yml +++ b/application.example.yml @@ -60,12 +60,15 @@ akhq: bootstrap.servers: "kafka:9092" schema-registry: url: "http://schema-registry:8085" # schema registry url (optional) - type: "confluent" # schema registry type (optional). Supported types are "confluent" (default) or "tibco" + type: "confluent" # schema registry type (optional). Supported types are "confluent", "tibco", "bsr", or "glue" # Basic Auth user / pass basic-auth-username: basic-auth-user basic-auth-password: basic-auth-pass properties: # standard kafka properties (optional) ssl.protocol: TLS + # BSR (Buf Schema Registry) specific configuration (only when type: "bsr") + # bsr-host: "buf.build" # or "bufbuild.internal" for enterprise + # bsr-token: "your-api-token" # API token for authentication (optional for public modules) connect: - name: connect-1 url: "http://connect:8083" @@ -92,6 +95,7 @@ akhq: # (optional) if descriptor-file properties are used descriptors-folder: "/app/protobuf_desc" topics-mapping: + # Standard protobuf deserialization (using descriptor files) - topic-regex: "album.*" descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM=" value-message-type: "org.akhq.utils.Album" @@ -102,6 +106,14 @@ akhq: descriptor-file: "other.desc" key-message-type: "org.akhq.utils.Row" value-message-type: "org.akhq.utils.Envelope" + # BSR protobuf deserialization (when schema-registry type is "bsr") + # Prefers headers (automatically added by BSR Kafka serializers): + # - buf.registry.value.schema.commit (BSR commit ID) + # - buf.registry.value.schema.message (fully-qualified message name) + # Falls back to topic mapping if headers not present: + # - topic-regex: "orders.*" + # bsr-commit: "a1b2c3d4e5f6..." + # bsr-message-type: "com.myorg.Order" # Ui Cluster Options (optional) ui-options: topic: diff --git a/build.gradle b/build.gradle index 6ef80d75f..f99490794 100644 --- a/build.gradle +++ b/build.gradle @@ -148,6 +148,9 @@ dependencies { implementation group: "com.google.protobuf", name: "protobuf-java", version: '4.33.0' implementation group: "com.google.protobuf", name: "protobuf-java-util", version: '4.33.0' + // Buf Schema Registry (BSR) - contains BSR API types for descriptor fetching + implementation group: "build.buf.bsr.kafka", name: "bsr-kafka-serde", version: '0.1.1' + //cenk is testing implementation("io.micronaut:micronaut-http-client") implementation("io.micronaut:micronaut-inject") diff --git a/docs/docs/configuration/schema-registry/bsr.md b/docs/docs/configuration/schema-registry/bsr.md new file mode 100644 index 000000000..a4017388b --- /dev/null +++ b/docs/docs/configuration/schema-registry/bsr.md @@ -0,0 +1,29 @@ +# Buf Schema Registry +Integration with Buf Schema Registry allows you to deserialize protobuf messages for which the schema is managed in +Buf Schema Registry. If using Bufstream or using headers injected into Kafka records with the bsr message type and +commit id, the AKHQ config does not need the topic mappings to be configured and only the schema registry host and +bsr token is required. However, where records do not have the headers, the topic mappings can be configured as follows. + + +```yaml +akhq: + environment: + AKHQ_CONFIGURATION: | + akhq: + connections: + bsr-test-cluster: + properties: + bootstrap.servers: "localhost:9092" # Your Kafka broker + schema-registry: + type: "bsr" + bsr-host: "bufbuild.internal" + bsr-token: "c7036237f576ccbe48b07fe5b99f12a0f725235aa4b75da04d357d2bbaebcb19" + deserialization: + protobuf: + topics-mapping: + # Config-based example (optional if using headers) + - topic-regex: "test.*" + bsr-message-type: "bufstream.demo.v1.EmailUpdated" # Replace with your message type + # bsr-commit: "commit-id" # Optional: specific commit/version +``` + diff --git a/src/main/java/org/akhq/configs/Connection.java b/src/main/java/org/akhq/configs/Connection.java index c9897056c..9005f89cd 100644 --- a/src/main/java/org/akhq/configs/Connection.java +++ b/src/main/java/org/akhq/configs/Connection.java @@ -35,6 +35,11 @@ public static class SchemaRegistry { SchemaRegistryType type = SchemaRegistryType.CONFLUENT; String glueSchemaRegistryName; String awsRegion; + + // BSR specific fields + String bsrHost; // e.g., "buf.build" or "bufbuild.internal" + String bsrToken; // API token for authentication + @MapFormat(transformation = MapFormat.MapTransformation.FLAT) Map properties; } diff --git a/src/main/java/org/akhq/configs/SchemaRegistryType.java b/src/main/java/org/akhq/configs/SchemaRegistryType.java index d1d06556b..22557e8b1 100644 --- a/src/main/java/org/akhq/configs/SchemaRegistryType.java +++ b/src/main/java/org/akhq/configs/SchemaRegistryType.java @@ -6,7 +6,8 @@ public enum SchemaRegistryType { CONFLUENT((byte) 0x0), TIBCO((byte) 0x80), - GLUE((byte) 0x0); + GLUE((byte) 0x0), + BSR((byte) 0x0); // BSR doesn't use magic byte prefix private byte magicByte; diff --git a/src/main/java/org/akhq/configs/TopicsMapping.java b/src/main/java/org/akhq/configs/TopicsMapping.java index 2f453a197..5a12300f3 100644 --- a/src/main/java/org/akhq/configs/TopicsMapping.java +++ b/src/main/java/org/akhq/configs/TopicsMapping.java @@ -13,4 +13,8 @@ public class TopicsMapping { String descriptorFileBase64; String keyMessageType; String valueMessageType; + + // BSR specific fields + String bsrCommit; // Specific commit/version (optional, can use headers instead) + String bsrMessageType; // Full protobuf message name for BSR (e.g., "com.myorg.Order") } diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index c82e7e45e..3649a2159 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -76,6 +76,9 @@ public class Record { @JsonIgnore private AvroToJsonDeserializer avroToJsonDeserializer; + @JsonIgnore + private org.akhq.utils.BufProtobufToJsonDeserializer bsrProtobufDeserializer; + @Getter(AccessLevel.NONE) private byte[] bytesKey; @@ -121,7 +124,9 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte public Record(SchemaRegistryClient client, ConsumerRecord record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer, Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer, - ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic, Deserializer awsGlueKafkaDeserializer) { + ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, + org.akhq.utils.BufProtobufToJsonDeserializer bsrProtobufDeserializer, + byte[] bytesValue, Topic topic, Deserializer awsGlueKafkaDeserializer) { if (schemaRegistryType == SchemaRegistryType.TIBCO) { this.MAGIC_BYTE = (byte) 0x80; } else { @@ -148,6 +153,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord record this.kafkaAvroDeserializer = kafkaAvroDeserializer; this.protobufToJsonDeserializer = protobufToJsonDeserializer; this.avroToJsonDeserializer = avroToJsonDeserializer; + this.bsrProtobufDeserializer = bsrProtobufDeserializer; this.kafkaProtoDeserializer = kafkaProtoDeserializer; this.avroToJsonSerializer = avroToJsonSerializer; this.kafkaJsonDeserializer = kafkaJsonDeserializer; @@ -269,6 +275,20 @@ private String convertToString(byte[] payload, String schemaId, boolean isKey) { return new String(payload); } } else { + // Try BSR first if configured + if (bsrProtobufDeserializer != null) { + try { + String record = bsrProtobufDeserializer.deserialize(topic.getName(), payload, isKey, this.headers); + if (record != null) { + return record; + } + } catch (Exception exception) { + this.exceptions.add(exception.getMessage()); + return new String(payload); + } + } + + // Then try custom protobuf deserializer if (protobufToJsonDeserializer != null) { try { String record = protobufToJsonDeserializer.deserialize(topic.getName(), payload, isKey); diff --git a/src/main/java/org/akhq/modules/KafkaModule.java b/src/main/java/org/akhq/modules/KafkaModule.java index 262bb17f8..0f85c0c95 100644 --- a/src/main/java/org/akhq/modules/KafkaModule.java +++ b/src/main/java/org/akhq/modules/KafkaModule.java @@ -19,6 +19,7 @@ import org.akhq.configs.AbstractProperties; import org.akhq.configs.Connection; import org.akhq.configs.Default; +import org.akhq.configs.SchemaRegistryType; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -211,6 +212,13 @@ public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) throws throw new InvalidClusterException(INVALID_CLUSTER + clusterId + "'"); } + // BSR doesn't use Confluent ProtobufSchemaProvider, return null + Connection connection = this.getConnection(clusterId); + if (connection.getSchemaRegistry() != null && + connection.getSchemaRegistry().getType() == SchemaRegistryType.BSR) { + return null; + } + ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider(); protobufSchemaProvider.configure(Collections.singletonMap( SCHEMA_VERSION_FETCHER, @@ -227,6 +235,12 @@ public RestService getRegistryRestClient(String clusterId) throws InvalidCluster Connection connection = this.getConnection(clusterId); + // BSR doesn't use Confluent RestService, return null + if (connection.getSchemaRegistry() != null && + connection.getSchemaRegistry().getType() == SchemaRegistryType.BSR) { + return null; + } + if (connection.getSchemaRegistry() != null) { RestService restService = new RestService( connection.getSchemaRegistry().getUrl() @@ -275,6 +289,7 @@ public RestService getRegistryRestClient(String clusterId) throws InvalidCluster } private final Map registryClient = new HashMap<>(); + private final Map bsrClients = new HashMap<>(); public SchemaRegistryClient getRegistryClient(String clusterId) throws InvalidClusterException { @@ -282,10 +297,14 @@ public SchemaRegistryClient getRegistryClient(String clusterId) throws InvalidCl throw new InvalidClusterException(INVALID_CLUSTER + clusterId + "'"); } - if (!this.registryClient.containsKey(clusterId)) { - Connection connection = this.getConnection(clusterId); - + // BSR doesn't use Confluent SchemaRegistryClient, return null + Connection connection = this.getConnection(clusterId); + if (connection.getSchemaRegistry() != null && + connection.getSchemaRegistry().getType() == SchemaRegistryType.BSR) { + return null; + } + if (!this.registryClient.containsKey(clusterId)) { List providers = new ArrayList<>(); providers.add(new AvroSchemaProvider()); providers.add(new JsonSchemaProvider()); @@ -386,4 +405,28 @@ public Map getKsqlDbClient(String clusterId) throws InvalidClust return this.ksqlDbClient.get(clusterId); } + + public org.akhq.modules.schemaregistry.BufSchemaRegistryClient getBsrClient(String clusterId) throws InvalidClusterException { + if (!this.clusterExists(clusterId)) { + throw new InvalidClusterException(INVALID_CLUSTER + clusterId + "'"); + } + + if (!this.bsrClients.containsKey(clusterId)) { + Connection connection = this.getConnection(clusterId); + + if (connection.getSchemaRegistry() != null && + connection.getSchemaRegistry().getBsrHost() != null && + !connection.getSchemaRegistry().getBsrHost().isEmpty()) { + + this.bsrClients.put(clusterId, + new org.akhq.modules.schemaregistry.BufSchemaRegistryClient( + connection.getSchemaRegistry().getBsrHost(), + connection.getSchemaRegistry().getBsrToken() + ) + ); + } + } + + return this.bsrClients.get(clusterId); + } } diff --git a/src/main/java/org/akhq/modules/schemaregistry/BufSchemaRegistryClient.java b/src/main/java/org/akhq/modules/schemaregistry/BufSchemaRegistryClient.java new file mode 100644 index 000000000..bd0e3cd78 --- /dev/null +++ b/src/main/java/org/akhq/modules/schemaregistry/BufSchemaRegistryClient.java @@ -0,0 +1,239 @@ +package org.akhq.modules.schemaregistry; + +import build.buf.bsr.kafka.gen.buf.registry.module.v1.GetFileDescriptorSetRequest; +import build.buf.bsr.kafka.gen.buf.registry.module.v1.GetFileDescriptorSetResponse; +import build.buf.bsr.kafka.gen.buf.registry.module.v1.ResourceRef; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.errors.SerializationException; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Standalone client for Buf Schema Registry (BSR) that fetches protobuf descriptors. + * + *

This implementation is adapted from the bsr-kafka-serde library (Apache 2.0 licensed) + * and uses the BSR HTTP API directly without reflection. + * + * @see buf-kafka-serde-java + */ +@Slf4j +public class BufSchemaRegistryClient { + private static final String HEADER_AUTHORIZATION = "Authorization"; + private static final String HEADER_CONTENT_TYPE = "Content-Type"; + private static final String HEADER_CONNECT_PROTOCOL_VERSION = "Connect-Protocol-Version"; + private static final String HEADER_CONNECT_TIMEOUT_MS = "Connect-Timeout-Ms"; + private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding"; + private static final String HEADER_USER_AGENT = "User-Agent"; + private static final String BEARER_PREFIX = "Bearer "; + private static final String METHOD_GET_FILE_DESCRIPTOR_SET = + "buf.registry.module.v1.FileDescriptorSetService/GetFileDescriptorSet"; + private static final String USER_AGENT = "akhq/bsr-client"; + private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30); + + private final String host; + private final String token; + private final HttpClient httpClient; + private final ConcurrentMap descriptorCache = new ConcurrentHashMap<>(); + + /** + * Creates a new BSR client. + * + * @param host BSR hostname (e.g., "buf.build", "bufbuild.internal") + * @param token API token for authentication (optional for public modules) + */ + public BufSchemaRegistryClient(String host, String token) { + if (host == null || host.isEmpty()) { + throw new IllegalArgumentException("BSR host cannot be null or empty"); + } + this.host = host; + this.token = token; + this.httpClient = HttpClient.newBuilder().build(); + + log.info("Initialized standalone BSR client for host: {}", host); + } + + /** + * Get a message descriptor from BSR by commit ID and message fully-qualified name. + * Results are cached locally. + * + * @param commitId BSR commit ID (e.g., "a1b2c3d4...") + * @param messageFQN Fully-qualified protobuf message name (e.g., "com.myorg.Order") + * @return Protobuf descriptor for the message + * @throws SerializationException if the descriptor cannot be retrieved + */ + public Descriptors.Descriptor getMessageDescriptor(String commitId, String messageFQN) + throws SerializationException { + if (commitId == null || commitId.isEmpty()) { + throw new IllegalArgumentException("Commit ID cannot be null or empty"); + } + if (messageFQN == null || messageFQN.isEmpty()) { + throw new IllegalArgumentException("Message FQN cannot be null or empty"); + } + + String cacheKey = commitId + ":" + messageFQN; + + // Check cache first + Descriptors.Descriptor cached = descriptorCache.get(cacheKey); + if (cached != null) { + log.debug("Using cached descriptor for {} (commit: {})", messageFQN, commitId); + return cached; + } + + log.debug("Fetching descriptor from BSR - commit: {}, message: {}", commitId, messageFQN); + + try { + Descriptors.Descriptor descriptor = fetchDescriptorFromBSR(commitId, messageFQN); + descriptorCache.put(cacheKey, descriptor); + log.debug("Successfully retrieved and cached descriptor for {}", messageFQN); + return descriptor; + } catch (Exception e) { + String errorMsg = String.format( + "Failed to retrieve descriptor from BSR - commit: %s, message: %s", + commitId, messageFQN); + log.error(errorMsg, e); + throw new SerializationException(errorMsg, e); + } + } + + private Descriptors.Descriptor fetchDescriptorFromBSR(String commitId, String messageFQN) + throws IOException, InterruptedException { + // Build the BSR API request + GetFileDescriptorSetRequest request = GetFileDescriptorSetRequest.newBuilder() + .setResourceRef(ResourceRef.newBuilder().setId(commitId).build()) + .addIncludeTypes(messageFQN) + .build(); + + // Build HTTP request + HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() + .uri(URI.create("https://" + host + "/" + METHOD_GET_FILE_DESCRIPTOR_SET)) + .header(HEADER_CONTENT_TYPE, "application/proto") + .header(HEADER_CONNECT_PROTOCOL_VERSION, "1") + .header(HEADER_ACCEPT_ENCODING, "identity") + .header(HEADER_USER_AGENT, USER_AGENT) + .header(HEADER_CONNECT_TIMEOUT_MS, "30000") + .timeout(DEFAULT_REQUEST_TIMEOUT) + .POST(HttpRequest.BodyPublishers.ofByteArray(request.toByteArray())); + + if (token != null && !token.isEmpty()) { + requestBuilder.header(HEADER_AUTHORIZATION, BEARER_PREFIX + token); + } + + HttpRequest httpRequest = requestBuilder.build(); + + // Send request + HttpResponse response = httpClient.send( + httpRequest, + HttpResponse.BodyHandlers.ofByteArray()); + + if (response.statusCode() != 200) { + throw new IOException(String.format( + "BSR API request failed: HTTP %d - %s", + response.statusCode(), + new String(response.body(), StandardCharsets.UTF_8))); + } + + // Parse response + GetFileDescriptorSetResponse bsrResponse = GetFileDescriptorSetResponse.parseFrom(response.body()); + + // Extract the descriptor + Descriptors.Descriptor descriptor = findMessageDescriptor( + bsrResponse.getFileDescriptorSet(), + messageFQN); + + if (descriptor == null) { + throw new IOException("Failed to find message descriptor for " + messageFQN); + } + + return descriptor; + } + + /** + * Finds a message descriptor within a FileDescriptorSet. + * Adapted from bsr-kafka-serde library. + */ + private static Descriptors.Descriptor findMessageDescriptor( + DescriptorProtos.FileDescriptorSet fds, + String messageFQN) { + Map descriptorsByName = new HashMap<>(fds.getFileCount()); + + // Build all file descriptors + for (DescriptorProtos.FileDescriptorProto fd : fds.getFileList()) { + try { + buildFileDescriptor(fd, fds, descriptorsByName); + } catch (Descriptors.DescriptorValidationException e) { + throw new SerializationException( + "Failed to build file descriptor for " + fd.getName(), e); + } + } + + // Parse message FQN into package and message name + final String packageName, messageName; + int lastDot = messageFQN.lastIndexOf('.'); + if (lastDot != -1) { + packageName = messageFQN.substring(0, lastDot); + messageName = messageFQN.substring(lastDot + 1); + } else { + packageName = ""; + messageName = messageFQN; + } + + // Find the message descriptor + for (Descriptors.FileDescriptor fd : descriptorsByName.values()) { + if (!fd.getPackage().equals(packageName)) { + continue; + } + Descriptors.Descriptor md = fd.findMessageTypeByName(messageName); + if (md != null) { + return md; + } + } + + return null; + } + + /** + * Recursively builds a FileDescriptor and its dependencies. + * Adapted from bsr-kafka-serde library. + */ + private static Descriptors.FileDescriptor buildFileDescriptor( + DescriptorProtos.FileDescriptorProto fdp, + DescriptorProtos.FileDescriptorSet fds, + Map fileDescriptorsByName) + throws Descriptors.DescriptorValidationException { + + if (fileDescriptorsByName.containsKey(fdp.getName())) { + return fileDescriptorsByName.get(fdp.getName()); + } + + List dependencies = new ArrayList<>(fdp.getDependencyCount()); + for (String depName : fdp.getDependencyList()) { + Descriptors.FileDescriptor dependency = fileDescriptorsByName.get(depName); + if (dependency != null) { + dependencies.add(dependency); + continue; + } + DescriptorProtos.FileDescriptorProto depProto = fds.getFileList().stream() + .filter(f -> f.getName().equals(depName)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Dependency not found: " + depName)); + dependencies.add(buildFileDescriptor(depProto, fds, fileDescriptorsByName)); + } + + Descriptors.FileDescriptor fd = Descriptors.FileDescriptor.buildFrom( + fdp, + dependencies.toArray(new Descriptors.FileDescriptor[0])); + fileDescriptorsByName.put(fdp.getName(), fd); + return fd; + } +} diff --git a/src/main/java/org/akhq/repositories/CustomDeserializerRepository.java b/src/main/java/org/akhq/repositories/CustomDeserializerRepository.java index fb38c5cce..d4cf5aa2d 100644 --- a/src/main/java/org/akhq/repositories/CustomDeserializerRepository.java +++ b/src/main/java/org/akhq/repositories/CustomDeserializerRepository.java @@ -1,8 +1,12 @@ package org.akhq.repositories; +import org.akhq.configs.Connection; +import org.akhq.configs.SchemaRegistryType; import org.akhq.modules.KafkaModule; +import org.akhq.modules.schemaregistry.BufSchemaRegistryClient; import org.akhq.utils.AvroToJsonDeserializer; import org.akhq.utils.AvroToJsonSerializer; +import org.akhq.utils.BufProtobufToJsonDeserializer; import org.akhq.utils.ProtobufToJsonDeserializer; import jakarta.inject.Inject; @@ -18,6 +22,7 @@ public class CustomDeserializerRepository { private AvroToJsonSerializer avroToJsonSerializer; private final Map protobufToJsonDeserializers = new HashMap<>(); private final Map avroToJsonDeserializers = new HashMap<>(); + private final Map bsrDeserializers = new HashMap<>(); public ProtobufToJsonDeserializer getProtobufToJsonDeserializer(String clusterId) { if (!this.protobufToJsonDeserializers.containsKey(clusterId)) { @@ -38,4 +43,27 @@ public AvroToJsonDeserializer getAvroToJsonDeserializer(String clusterId) { } return this.avroToJsonDeserializers.get(clusterId); } + + public BufProtobufToJsonDeserializer getBsrProtobufDeserializer(String clusterId) { + if (!this.bsrDeserializers.containsKey(clusterId)) { + Connection connection = kafkaModule.getConnection(clusterId); + + // Only create BSR deserializer if BSR is configured + if (connection.getSchemaRegistry() != null && + connection.getSchemaRegistry().getType() == SchemaRegistryType.BSR) { + + BufSchemaRegistryClient bsrClient = kafkaModule.getBsrClient(clusterId); + if (bsrClient != null) { + this.bsrDeserializers.put( + clusterId, + new BufProtobufToJsonDeserializer( + bsrClient, + connection.getDeserialization().getProtobuf() + ) + ); + } + } + } + return this.bsrDeserializers.get(clusterId); + } } diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index b6bd04430..dd533f55f 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -464,6 +464,7 @@ private Record newRecord(ConsumerRecord record, String clusterId this.avroToJsonSerializer, this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId), this.customDeserializerRepository.getAvroToJsonDeserializer(clusterId), + this.customDeserializerRepository.getBsrProtobufDeserializer(clusterId), avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(clusterId)), topic, @@ -484,6 +485,7 @@ private Record newRecord(ConsumerRecord record, BaseOptions opti this.avroToJsonSerializer, this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId), this.customDeserializerRepository.getAvroToJsonDeserializer(options.clusterId), + this.customDeserializerRepository.getBsrProtobufDeserializer(options.clusterId), avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)), topic, diff --git a/src/main/java/org/akhq/utils/BufProtobufToJsonDeserializer.java b/src/main/java/org/akhq/utils/BufProtobufToJsonDeserializer.java new file mode 100644 index 000000000..b4b27c162 --- /dev/null +++ b/src/main/java/org/akhq/utils/BufProtobufToJsonDeserializer.java @@ -0,0 +1,116 @@ +package org.akhq.utils; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.util.JsonFormat; +import lombok.extern.slf4j.Slf4j; +import org.akhq.configs.Connection.Deserialization.ProtobufDeserializationTopicsMapping; +import org.akhq.configs.TopicsMapping; +import org.akhq.models.KeyValue; +import org.akhq.modules.schemaregistry.BufSchemaRegistryClient; +import org.apache.kafka.common.errors.SerializationException; + +import java.util.ArrayList; +import java.util.List; + +/** + * Deserializer for protobuf messages using Buf Schema Registry (BSR). + * Supports header-based schema resolution (preferred) and topic mapping fallback. + */ +@Slf4j +public class BufProtobufToJsonDeserializer { + private final BufSchemaRegistryClient bsrClient; + private final List topicsMapping; + + public static final String HEADER_BSR_COMMIT = "buf.registry.value.schema.commit"; + public static final String HEADER_BSR_MESSAGE = "buf.registry.value.schema.message"; + + public BufProtobufToJsonDeserializer( + BufSchemaRegistryClient bsrClient, + ProtobufDeserializationTopicsMapping config) { + this.bsrClient = bsrClient; + this.topicsMapping = config != null ? config.getTopicsMapping() : new ArrayList<>(); + } + + /** + * Deserialize protobuf binary data using BSR. + * Prefers headers, falls back to topic mapping configuration. + * + * @param topic Topic name + * @param buffer Binary protobuf data + * @param isKey Is this a key or value + * @param headers Kafka record headers (may contain BSR schema info) + * @return JSON string or null if not configured + */ + public String deserialize(String topic, byte[] buffer, boolean isKey, + List> headers) { + if (buffer == null) { + log.debug("Buffer is null for topic: {}", topic); + return null; + } + + log.debug("BSR deserialize called - topic: {}, isKey: {}, bufferSize: {}", + topic, isKey, buffer.length); + + // Prefer headers (standard BSR approach) + String commitFromHeader = getHeaderValue(headers, HEADER_BSR_COMMIT); + String messageFromHeader = getHeaderValue(headers, HEADER_BSR_MESSAGE); + + if (commitFromHeader != null && messageFromHeader != null) { + log.debug("Using BSR schema from headers - commit: {}, message: {}", commitFromHeader, messageFromHeader); + return deserializeWithBSR(commitFromHeader, messageFromHeader, buffer); + } + + // Fall back to topic mapping if configured + TopicsMapping mapping = findMatchingMapping(topic); + if (mapping != null && mapping.getBsrCommit() != null && mapping.getBsrMessageType() != null) { + log.debug("Using BSR schema from topic mapping - topic: {}, commit: {}, message: {}", + topic, mapping.getBsrCommit(), mapping.getBsrMessageType()); + return deserializeWithBSR(mapping.getBsrCommit(), mapping.getBsrMessageType(), buffer); + } + + log.debug("No BSR configuration found for topic [{}]", topic); + return null; + } + + private String deserializeWithBSR(String commit, String messageFQN, byte[] buffer) { + log.debug("Deserializing with BSR - commit: {}, message: {}, bufferSize: {}", + commit, messageFQN, buffer.length); + + try { + // Step 1: Fetch descriptor from BSR (via HTTP) + Descriptors.Descriptor descriptor = bsrClient.getMessageDescriptor(commit, messageFQN); + + // Step 2: Parse protobuf bytes directly using the descriptor + DynamicMessage message = DynamicMessage.parseFrom(descriptor, buffer); + + // Step 3: Convert to JSON + String json = JsonFormat.printer().print(message); + log.debug("Successfully deserialized and converted to JSON"); + return json; + } catch (Exception e) { + log.error("Failed to deserialize with BSR - commit: {}, message: {}", commit, messageFQN, e); + throw new SerializationException( + String.format("Failed to deserialize with BSR [commit=%s, message=%s]: %s", + commit, messageFQN, e.getMessage()), e); + } + } + + private TopicsMapping findMatchingMapping(String topic) { + return topicsMapping.stream() + .filter(mapping -> topic.matches(mapping.getTopicRegex())) + .findFirst() + .orElse(null); + } + + private String getHeaderValue(List> headers, String key) { + if (headers == null) { + return null; + } + return headers.stream() + .filter(h -> key.equals(h.getKey())) + .map(KeyValue::getValue) + .findFirst() + .orElse(null); + } +} diff --git a/src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java b/src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java index 3b8952f1a..059a5da62 100644 --- a/src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java +++ b/src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java @@ -51,6 +51,11 @@ public ProtobufToJsonDeserializer(Connection.Deserialization.ProtobufDeserializa private Map> buildAllDescriptors() { Map> allDescriptors = new HashMap<>(); for (TopicsMapping mapping : topicsMapping) { + // Skip BSR-only mappings (they don't use descriptor files) + if (mapping.getBsrCommit() != null || mapping.getBsrMessageType() != null) { + continue; + } + byte[] fileBytes = new byte[0]; try { fileBytes = getDescriptorFileAsBytes(mapping); @@ -148,7 +153,8 @@ private TopicsMapping findMatchingConfig(String topic) { return new TopicsMapping( mapping.getTopicRegex(), mapping.getDescriptorFile(), mapping.getDescriptorFileBase64(), - mapping.getKeyMessageType(), mapping.getValueMessageType()); + mapping.getKeyMessageType(), mapping.getValueMessageType(), + mapping.getBsrCommit(), mapping.getBsrMessageType()); } } return null;