Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ akhq:
bootstrap.servers: "kafka:9092"
schema-registry:
url: "http://schema-registry:8085" # schema registry url (optional)
type: "confluent" # schema registry type (optional). Supported types are "confluent" (default) or "tibco"
type: "confluent" # schema registry type (optional). Supported types are "confluent", "tibco", or "bsr"
# Basic Auth user / pass
basic-auth-username: basic-auth-user
basic-auth-password: basic-auth-pass
properties: # standard kafka properties (optional)
ssl.protocol: TLS
# BSR (Buf Schema Registry) specific configuration (only when type: "bsr")
# bsr-host: "buf.build" # or "bufbuild.internal" for enterprise
# bsr-token: "your-api-token" # API token for authentication (optional for public modules)
connect:
- name: connect-1
url: "http://connect:8083"
Expand All @@ -92,6 +95,7 @@ akhq:
# (optional) if descriptor-file properties are used
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
# Standard protobuf deserialization (using descriptor files)
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
value-message-type: "org.akhq.utils.Album"
Expand All @@ -102,6 +106,14 @@ akhq:
descriptor-file: "other.desc"
key-message-type: "org.akhq.utils.Row"
value-message-type: "org.akhq.utils.Envelope"
# BSR protobuf deserialization (when schema-registry type is "bsr")
# Prefers headers (automatically added by BSR Kafka serializers):
# - buf.registry.value.schema.commit (BSR commit ID)
# - buf.registry.value.schema.message (fully-qualified message name)
# Falls back to topic mapping if headers not present:
# - topic-regex: "orders.*"
# bsr-commit: "a1b2c3d4e5f6..."
# bsr-message-type: "com.myorg.Order"
# Ui Cluster Options (optional)
ui-options:
topic:
Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ dependencies {
implementation group: "com.google.protobuf", name: "protobuf-java", version: '4.33.0'
implementation group: "com.google.protobuf", name: "protobuf-java-util", version: '4.33.0'

// Buf Schema Registry (BSR) - contains BSR API types for descriptor fetching
implementation group: "build.buf.bsr.kafka", name: "bsr-kafka-serde", version: '0.1.1'

//cenk is testing
implementation("io.micronaut:micronaut-http-client")
implementation("io.micronaut:micronaut-inject")
Expand Down
29 changes: 29 additions & 0 deletions docs/docs/configuration/schema-registry/bsr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Buf Schema Registry
Integration with Buf Schema Registry allows you to deserialize protobuf messages for which the schema is managed in
Buf Schema Registry. If using Bufstream or using headers injected into Kafka records with the bsr message type and
commit id, the AKHQ config does not need the topic mappings to be configured and only the schema registry host and
bsr token is required. However, where records do not have the headers, the topic mappings can be configured as follows.


```yaml
akqh:
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
bsr-test-cluster:
properties:
bootstrap.servers: "localhost:9092" # Your Kafka broker
schema-registry:
type: "bsr"
bsr-host: "bufbuild.internal"
bsr-token: "c7036237f576ccbe48b07fe5b99f12a0f725235aa4b75da04d357d2bbaebcb19"
deserialization:
protobuf:
topics-mapping:
# Config-based example (optional if using headers)
- topic-regex: "test.*"
bsr-message-type: "bufstream.demo.v1.EmailUpdated" # Replace with your message type
# bsr-commit: "commit-id" # Optional: specific commit/version
```

5 changes: 5 additions & 0 deletions src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public static class SchemaRegistry {
SchemaRegistryType type = SchemaRegistryType.CONFLUENT;
String glueSchemaRegistryName;
String awsRegion;

// BSR specific fields
String bsrHost; // e.g., "buf.build" or "bufbuild.internal"
String bsrToken; // API token for authentication

@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/akhq/configs/SchemaRegistryType.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
public enum SchemaRegistryType {
CONFLUENT((byte) 0x0),
TIBCO((byte) 0x80),
GLUE((byte) 0x0);
GLUE((byte) 0x0),
BSR((byte) 0x0); // BSR doesn't use magic byte prefix

private byte magicByte;

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/akhq/configs/TopicsMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ public class TopicsMapping {
String descriptorFileBase64;
String keyMessageType;
String valueMessageType;

// BSR specific fields
String bsrCommit; // Specific commit/version (optional, can use headers instead)
String bsrMessageType; // Full protobuf message name for BSR (e.g., "com.myorg.Order")
}
22 changes: 21 additions & 1 deletion src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class Record {
@JsonIgnore
private AvroToJsonDeserializer avroToJsonDeserializer;

@JsonIgnore
private org.akhq.utils.BufProtobufToJsonDeserializer bsrProtobufDeserializer;

@Getter(AccessLevel.NONE)
private byte[] bytesKey;

Expand Down Expand Up @@ -121,7 +124,9 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic, Deserializer awsGlueKafkaDeserializer) {
ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer,
org.akhq.utils.BufProtobufToJsonDeserializer bsrProtobufDeserializer,
byte[] bytesValue, Topic topic, Deserializer awsGlueKafkaDeserializer) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
Expand All @@ -148,6 +153,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
this.avroToJsonDeserializer = avroToJsonDeserializer;
this.bsrProtobufDeserializer = bsrProtobufDeserializer;
this.kafkaProtoDeserializer = kafkaProtoDeserializer;
this.avroToJsonSerializer = avroToJsonSerializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
Expand Down Expand Up @@ -269,6 +275,20 @@ private String convertToString(byte[] payload, String schemaId, boolean isKey) {
return new String(payload);
}
} else {
// Try BSR first if configured
if (bsrProtobufDeserializer != null) {
try {
String record = bsrProtobufDeserializer.deserialize(topic.getName(), payload, isKey, this.headers);
if (record != null) {
return record;
}
} catch (Exception exception) {
this.exceptions.add(exception.getMessage());
return new String(payload);
}
}

// Then try custom protobuf deserializer
if (protobufToJsonDeserializer != null) {
try {
String record = protobufToJsonDeserializer.deserialize(topic.getName(), payload, isKey);
Expand Down
49 changes: 46 additions & 3 deletions src/main/java/org/akhq/modules/KafkaModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.akhq.configs.AbstractProperties;
import org.akhq.configs.Connection;
import org.akhq.configs.Default;
import org.akhq.configs.SchemaRegistryType;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -211,6 +212,13 @@ public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) throws
throw new InvalidClusterException(INVALID_CLUSTER + clusterId + "'");
}

// BSR doesn't use Confluent ProtobufSchemaProvider, return null
Connection connection = this.getConnection(clusterId);
if (connection.getSchemaRegistry() != null &&
connection.getSchemaRegistry().getType() == SchemaRegistryType.BSR) {
return null;
}

ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider();
protobufSchemaProvider.configure(Collections.singletonMap(
SCHEMA_VERSION_FETCHER,
Expand All @@ -227,6 +235,12 @@ public RestService getRegistryRestClient(String clusterId) throws InvalidCluster

Connection connection = this.getConnection(clusterId);

// BSR doesn't use Confluent RestService, return null
if (connection.getSchemaRegistry() != null &&
connection.getSchemaRegistry().getType() == SchemaRegistryType.BSR) {
return null;
}

if (connection.getSchemaRegistry() != null) {
RestService restService = new RestService(
connection.getSchemaRegistry().getUrl()
Expand Down Expand Up @@ -275,17 +289,22 @@ public RestService getRegistryRestClient(String clusterId) throws InvalidCluster
}

private final Map<String, SchemaRegistryClient> registryClient = new HashMap<>();
private final Map<String, org.akhq.modules.schemaregistry.BufSchemaRegistryClient> bsrClients = new HashMap<>();


public SchemaRegistryClient getRegistryClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException(INVALID_CLUSTER + clusterId + "'");
}

if (!this.registryClient.containsKey(clusterId)) {
Connection connection = this.getConnection(clusterId);

// BSR doesn't use Confluent SchemaRegistryClient, return null
Connection connection = this.getConnection(clusterId);
if (connection.getSchemaRegistry() != null &&
connection.getSchemaRegistry().getType() == SchemaRegistryType.BSR) {
return null;
}

if (!this.registryClient.containsKey(clusterId)) {
List<SchemaProvider> providers = new ArrayList<>();
providers.add(new AvroSchemaProvider());
providers.add(new JsonSchemaProvider());
Expand Down Expand Up @@ -386,4 +405,28 @@ public Map<String, Client> getKsqlDbClient(String clusterId) throws InvalidClust

return this.ksqlDbClient.get(clusterId);
}

public org.akhq.modules.schemaregistry.BufSchemaRegistryClient getBsrClient(String clusterId) throws InvalidClusterException {
if (!this.clusterExists(clusterId)) {
throw new InvalidClusterException(INVALID_CLUSTER + clusterId + "'");
}

if (!this.bsrClients.containsKey(clusterId)) {
Connection connection = this.getConnection(clusterId);

if (connection.getSchemaRegistry() != null &&
connection.getSchemaRegistry().getBsrHost() != null &&
!connection.getSchemaRegistry().getBsrHost().isEmpty()) {

this.bsrClients.put(clusterId,
new org.akhq.modules.schemaregistry.BufSchemaRegistryClient(
connection.getSchemaRegistry().getBsrHost(),
connection.getSchemaRegistry().getBsrToken()
)
);
}
}

return this.bsrClients.get(clusterId);
}
}
Loading