Skip to content

Commit d3b5702

Browse files
committed
decoding of AMQP-encoded header values
1 parent 420399b commit d3b5702

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88
- [#306](https://github.com/deviceinsight/kafkactl/pull/306) Support building arm image by updating to goreleaser v2
99
- new flag `--print-all` for consume command which combines the other print parameters into one
10+
- decoding of AMQP-encoded header values
1011

1112
## 5.15.0 - 2025-11-13
1213

internal/consume/MessageDeserializer.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/base64"
55
"encoding/hex"
66
"sort"
7+
"time"
78
"unicode/utf8"
89

910
"github.com/IBM/sarama"
@@ -62,14 +63,63 @@ func encodeRecordHeaders(headers []*sarama.RecordHeader) map[string]string {
6263
}
6364

6465
key := *encodeBytes(header.Key, "")
65-
value := *encodeBytes(header.Value, "")
66+
var value string
67+
if encodedValue := encodeBytes(decodeAMQPValue(header.Value), ""); encodedValue != nil {
68+
value = *encodedValue
69+
}
6670

6771
data[key] = value
6872
}
6973

7074
return data
7175
}
7276

77+
// decodeAMQPValue attempts to decode AMQP-encoded header values.
78+
// Azure Event Hubs wraps Kafka header values in AMQP type descriptors when using the Kafka interface.
79+
// This not moved to the azure plugin, because of the required RPC overhead per header value.
80+
// For regular Kafka servers, this function returns the input unchanged.
81+
func decodeAMQPValue(data []byte) []byte {
82+
if len(data) < 2 {
83+
return data
84+
}
85+
86+
switch data[0] {
87+
case 0xa1: // str8: string with 1-byte length
88+
length := int(data[1])
89+
if len(data) >= 2+length {
90+
return data[2 : 2+length]
91+
}
92+
case 0xb1: // str32: string with 4-byte length
93+
if len(data) >= 6 {
94+
length := int(data[1])<<24 | int(data[2])<<16 | int(data[3])<<8 | int(data[4])
95+
if len(data) >= 5+length {
96+
return data[5 : 5+length]
97+
}
98+
}
99+
case 0xa0: // vbin8: binary with 1-byte length
100+
length := int(data[1])
101+
if len(data) >= 2+length {
102+
return data[2 : 2+length]
103+
}
104+
case 0xb0: // vbin32: binary with 4-byte length
105+
if len(data) >= 6 {
106+
length := int(data[1])<<24 | int(data[2])<<16 | int(data[3])<<8 | int(data[4])
107+
if len(data) >= 5+length {
108+
return data[5 : 5+length]
109+
}
110+
}
111+
case 0x83: // timestamp: 64-bit milliseconds since Unix epoch
112+
if len(data) >= 9 {
113+
ms := int64(data[1])<<56 | int64(data[2])<<48 | int64(data[3])<<40 | int64(data[4])<<32 |
114+
int64(data[5])<<24 | int64(data[6])<<16 | int64(data[7])<<8 | int64(data[8])
115+
t := time.UnixMilli(ms)
116+
return []byte(t.Format(time.RFC3339))
117+
}
118+
}
119+
120+
return data
121+
}
122+
73123
func toSortedArray(headers map[string]string) []string {
74124

75125
var column []string

0 commit comments

Comments
 (0)