Skip to content

Commit 46c84bf

Browse files
committed
confluent,avro,salesforce: replace linkedin/goavro and hamba/avro with twmb/avro
Remove all linkedin/goavro and hamba/avro imports from the confluent, avro, and salesforce packages. All Avro encoding and decoding now uses github.com/twmb/avro. Schema registry decoder uses EncodeJSON for JSON output paths and Decode(TaggedUnions, TagLogicalTypes) with CustomType registrations for the preserve_logical_types path. The avro_walker.go file contains only connect-specific CustomType registrations: Kafka Connect type translation (Debezium timestamps), Duration→Time for time-of-day fields, and Duration→string for the duration logical type. All schema tree walking, union wrapping, and branch naming are handled by twmb/avro at decode time. Avro processor and scanner use EncodeJSON(TaggedUnions, TagLogicalTypes) for Avro JSON output and EncodeJSON() for standard JSON output. Delete normalize_for_avro_schema.go (572 lines) — type coercion, timestamp parsing, and union normalization are handled by twmb/avro's Encode and DecodeJSON. Subsumes PR #4124 (Fix avro timestamp encoding): twmb/avro's Encode natively accepts RFC 3339 strings for timestamp fields. File renames: serde_goavro.go → serde_avro.go serde_hamba_avro.go → avro_walker.go useHamba → preserveLogicalTypes Breaking change in preserve_logical_types mode: Avro fixed fields in structured output (SetStructuredMut) are now base64-encoded strings instead of arrays of integers. This affects Bloblang expressions that access fixed field values as arrays (e.g. this.fixedField.index(0)). The change only affects users with preserve_logical_types enabled AND schemas containing fixed-type fields — a rare combination. Fixed fields are binary data and base64 is the standard JSON representation; the old integer-array format was a hamba-specific behavior. Error messages from twmb/avro differ in wording from goavro/hamba but all invalid inputs are still rejected.
1 parent c457f82 commit 46c84bf

18 files changed

Lines changed: 409 additions & 1806 deletions

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ require (
113113
github.com/jackc/pgx/v5 v5.9.1
114114
github.com/jhump/protoreflect v1.18.0
115115
github.com/lib/pq v1.12.0
116-
github.com/linkedin/goavro/v2 v2.15.0
117116
github.com/matoous/go-nanoid/v2 v2.1.0
118117
github.com/microcosm-cc/bluemonday v1.0.27
119118
github.com/microsoft/go-mssqldb v1.9.8
@@ -167,6 +166,7 @@ require (
167166
github.com/timeplus-io/proton-go-driver/v2 v2.1.4
168167
github.com/tmc/langchaingo v0.1.14
169168
github.com/trinodb/trino-go-client v0.333.0
169+
github.com/twmb/avro v1.3.1
170170
github.com/twmb/franz-go v1.20.7
171171
github.com/twmb/franz-go/pkg/kadm v1.17.2
172172
github.com/twmb/franz-go/pkg/kmsg v1.12.0
@@ -276,6 +276,7 @@ require (
276276
github.com/knadh/koanf/providers/file v1.2.1 // indirect
277277
github.com/knadh/koanf/providers/rawbytes v1.0.0 // indirect
278278
github.com/knadh/koanf/v2 v2.3.3 // indirect
279+
github.com/linkedin/goavro/v2 v2.15.0 // indirect
279280
github.com/lithammer/fuzzysearch v1.1.8 // indirect
280281
github.com/mattn/go-runewidth v0.0.21 // indirect
281282
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1727,6 +1727,8 @@ github.com/trivago/grok v1.0.0/go.mod h1:9t59xLInhrncYq9a3J7488NgiBZi5y5yC7bss+w
17271727
github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM=
17281728
github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc=
17291729
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
1730+
github.com/twmb/avro v1.3.1 h1:OZw+qg/EhqW6rn0CrqzkcTiS33khJoSN08iPTo4Kubs=
1731+
github.com/twmb/avro v1.3.1/go.mod h1:TUQS96Ptl8tDRyK0Jw91FXIVCoPKz4sXxvUSShiG5FA=
17301732
github.com/twmb/franz-go v1.20.7 h1:P4MGSXJjjAPP3NRGPCks/Lrq+j+twWMVl1qYCVgNmWY=
17311733
github.com/twmb/franz-go v1.20.7/go.mod h1:0bRX9HZVaoueqFWhPZNi2ODnJL7DNa6mK0HeCrC2bNU=
17321734
github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8=

internal/impl/avro/processor.go

Lines changed: 71 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ package avro
1616

1717
import (
1818
"context"
19+
"encoding/json"
1920
"errors"
2021
"fmt"
2122
"io"
2223
"net/http"
2324
"strings"
2425

25-
"github.com/linkedin/goavro/v2"
26+
"github.com/twmb/avro"
2627

2728
"github.com/redpanda-data/benthos/v4/public/service"
2829
)
@@ -65,16 +66,35 @@ func init() {
6566

6667
type avroOperator func(part *service.Message) error
6768

68-
func newAvroToJSONOperator(encoding string, codec *goavro.Codec) (avroOperator, error) {
69+
// setAvroStructured encodes a decoded Avro value as tagged-union JSON
70+
// and sets it as structured data on the message. The JSON round-trip
71+
// ensures all values are JSON-native types (float64, string, etc.)
72+
// for Bloblang compatibility.
73+
func setAvroStructured(schema *avro.Schema, native any, part *service.Message) error {
74+
jb, err := schema.EncodeJSON(native, avro.TaggedUnions(), avro.TagLogicalTypes())
75+
if err != nil {
76+
return fmt.Errorf("converting Avro document to JSON: %v", err)
77+
}
78+
var jObj any
79+
if err := json.Unmarshal(jb, &jObj); err != nil {
80+
return fmt.Errorf("converting Avro document to JSON: %v", err)
81+
}
82+
part.SetStructuredMut(jObj)
83+
return nil
84+
}
85+
86+
func newAvroToJSONOperator(encoding string, schema *avro.Schema) (avroOperator, error) {
6987
switch encoding {
7088
case "textual":
89+
// Input is Avro JSON bytes. Parse as standard JSON to preserve
90+
// union wrapping ({"type": value} format) in the structured output.
7191
return func(part *service.Message) error {
7292
pBytes, err := part.AsBytes()
7393
if err != nil {
7494
return err
7595
}
76-
jObj, _, err := codec.NativeFromTextual(pBytes)
77-
if err != nil {
96+
var jObj any
97+
if err := json.Unmarshal(pBytes, &jObj); err != nil {
7898
return fmt.Errorf("converting Avro document to JSON: %v", err)
7999
}
80100
part.SetStructuredMut(jObj)
@@ -86,66 +106,83 @@ func newAvroToJSONOperator(encoding string, codec *goavro.Codec) (avroOperator,
86106
if err != nil {
87107
return err
88108
}
89-
jObj, _, err := codec.NativeFromBinary(pBytes)
90-
if err != nil {
109+
var native any
110+
if _, err := schema.Decode(pBytes, &native); err != nil {
91111
return fmt.Errorf("converting Avro document to JSON: %v", err)
92112
}
93-
part.SetStructuredMut(jObj)
94-
return nil
113+
return setAvroStructured(schema, native, part)
95114
}, nil
96115
case "single":
97116
return func(part *service.Message) error {
98117
pBytes, err := part.AsBytes()
99118
if err != nil {
100119
return err
101120
}
102-
jObj, _, err := codec.NativeFromSingle(pBytes)
103-
if err != nil {
121+
var native any
122+
if _, err := schema.DecodeSingleObject(pBytes, &native); err != nil {
104123
return fmt.Errorf("converting Avro document to JSON: %v", err)
105124
}
106-
part.SetStructuredMut(jObj)
107-
return nil
125+
return setAvroStructured(schema, native, part)
108126
}, nil
109127
}
110128
return nil, fmt.Errorf("encoding '%v' not recognised", encoding)
111129
}
112130

113-
func newAvroFromJSONOperator(encoding string, codec *goavro.Codec) (avroOperator, error) {
131+
func newAvroFromJSONOperator(encoding string, schema *avro.Schema) (avroOperator, error) {
114132
switch encoding {
115133
case "textual":
134+
// Input is Avro JSON (with union wrapping). Decode it, then
135+
// re-encode to Avro JSON to validate against the schema.
116136
return func(part *service.Message) error {
117-
jObj, err := part.AsStructured()
137+
pBytes, err := part.AsBytes()
118138
if err != nil {
119-
return fmt.Errorf("parsing message as JSON: %v", err)
139+
return fmt.Errorf("parsing message as bytes: %v", err)
140+
}
141+
// DecodeJSON unwraps unions; EncodeJSON re-wraps them.
142+
var native any
143+
if err := schema.DecodeJSON(pBytes, &native); err != nil {
144+
return fmt.Errorf("converting JSON to Avro schema: %v", err)
120145
}
121-
var textual []byte
122-
if textual, err = codec.TextualFromNative(nil, jObj); err != nil {
146+
textual, err := schema.EncodeJSON(native, avro.TaggedUnions(), avro.TagLogicalTypes())
147+
if err != nil {
123148
return fmt.Errorf("converting JSON to Avro schema: %v", err)
124149
}
125150
part.SetBytes(textual)
126151
return nil
127152
}, nil
128153
case "binary":
154+
// Input is Avro JSON (with union wrapping). Decode it to
155+
// native types, then encode to Avro binary.
129156
return func(part *service.Message) error {
130-
jObj, err := part.AsStructured()
157+
pBytes, err := part.AsBytes()
131158
if err != nil {
132-
return fmt.Errorf("parsing message as JSON: %v", err)
159+
return fmt.Errorf("parsing message as bytes: %v", err)
160+
}
161+
var native any
162+
if err := schema.DecodeJSON(pBytes, &native); err != nil {
163+
return fmt.Errorf("converting JSON to Avro schema: %v", err)
133164
}
134-
var binary []byte
135-
if binary, err = codec.BinaryFromNative(nil, jObj); err != nil {
165+
binary, err := schema.Encode(native)
166+
if err != nil {
136167
return fmt.Errorf("converting JSON to Avro schema: %v", err)
137168
}
138169
part.SetBytes(binary)
139170
return nil
140171
}, nil
141172
case "single":
173+
// Input is Avro JSON (with union wrapping). Decode it to
174+
// native types, then encode to single-object format.
142175
return func(part *service.Message) error {
143-
jObj, err := part.AsStructured()
176+
pBytes, err := part.AsBytes()
144177
if err != nil {
145-
return fmt.Errorf("parsing message as JSON: %v", err)
178+
return fmt.Errorf("parsing message as bytes: %v", err)
146179
}
147-
var single []byte
148-
if single, err = codec.SingleFromNative(nil, jObj); err != nil {
180+
var native any
181+
if err := schema.DecodeJSON(pBytes, &native); err != nil {
182+
return fmt.Errorf("converting JSON to Avro schema: %v", err)
183+
}
184+
single, err := schema.AppendSingleObject(nil, native)
185+
if err != nil {
149186
return fmt.Errorf("converting JSON to Avro schema: %v", err)
150187
}
151188
part.SetBytes(single)
@@ -155,12 +192,12 @@ func newAvroFromJSONOperator(encoding string, codec *goavro.Codec) (avroOperator
155192
return nil, fmt.Errorf("encoding '%v' not recognised", encoding)
156193
}
157194

158-
func strToAvroOperator(opStr, encoding string, codec *goavro.Codec) (avroOperator, error) {
195+
func strToAvroOperator(opStr, encoding string, schema *avro.Schema) (avroOperator, error) {
159196
switch opStr {
160197
case "to_json":
161-
return newAvroToJSONOperator(encoding, codec)
198+
return newAvroToJSONOperator(encoding, schema)
162199
case "from_json":
163-
return newAvroFromJSONOperator(encoding, codec)
200+
return newAvroFromJSONOperator(encoding, schema)
164201
}
165202
return nil, fmt.Errorf("operator not recognised: %v", opStr)
166203
}
@@ -192,13 +229,13 @@ func loadSchema(schemaPath string) (string, error) {
192229

193230
//------------------------------------------------------------------------------
194231

195-
type avro struct {
232+
type avroProcessor struct {
196233
operator avroOperator
197234
log *service.Logger
198235
}
199236

200237
func newAvroFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
201-
a := &avro{log: mgr.Logger()}
238+
a := &avroProcessor{log: mgr.Logger()}
202239

203240
var operator, encoding, schema, schemaPath string
204241
var err error
@@ -227,20 +264,20 @@ func newAvroFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (serv
227264
return nil, errors.New("a schema must be specified with either the `schema` or `schema_path` fields")
228265
}
229266

230-
codec, err := goavro.NewCodec(schema)
267+
parsed, err := avro.Parse(schema)
231268
if err != nil {
232269
return nil, fmt.Errorf("parsing schema: %v", err)
233270
}
234271

235-
if a.operator, err = strToAvroOperator(operator, encoding, codec); err != nil {
272+
if a.operator, err = strToAvroOperator(operator, encoding, parsed); err != nil {
236273
return nil, err
237274
}
238275
return a, nil
239276
}
240277

241278
//------------------------------------------------------------------------------
242279

243-
func (p *avro) Process(_ context.Context, msg *service.Message) (service.MessageBatch, error) {
280+
func (p *avroProcessor) Process(_ context.Context, msg *service.Message) (service.MessageBatch, error) {
244281
err := p.operator(msg)
245282
if err != nil {
246283
p.log.Debugf("Operator failed: %v\n", err)
@@ -249,6 +286,6 @@ func (p *avro) Process(_ context.Context, msg *service.Message) (service.Message
249286
return service.MessageBatch{msg}, nil
250287
}
251288

252-
func (*avro) Close(context.Context) error {
289+
func (*avroProcessor) Close(context.Context) error {
253290
return nil
254291
}

internal/impl/avro/scanner.go

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
package avro
1616

1717
import (
18-
"bufio"
1918
"context"
19+
"errors"
2020
"fmt"
2121
"io"
2222

23-
"github.com/linkedin/goavro/v2"
23+
"github.com/twmb/avro"
24+
"github.com/twmb/avro/ocf"
2425

2526
"github.com/redpanda-data/benthos/v4/public/service"
2627
)
@@ -47,13 +48,13 @@ For example, the union schema ` + "`[\"null\",\"string\",\"Foo\"]`, where `Foo`"
4748
- the string ` + "`\"a\"` as `{\"string\": \"a\"}`" + `; and
4849
- a ` + "`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance.
4950
50-
However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field ` + "<<avro_raw_json,`avro_raw_json`>> to `true`" + `.
51+
However, it is possible to instead create documents in standard/raw JSON format by setting the field ` + "<<avro_raw_json,`avro_raw_json`>> to `true`" + `.
5152
5253
This scanner also emits the canonical Avro schema as ` + "`@avro_schema`" + ` metadata, along with the schema's fingerprint available via ` + "`@avro_schema_fingerprint`" + `.
5354
`).
5455
Fields(
5556
service.NewBoolField(sFieldRawJSON).
56-
Description("Whether messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.").
57+
Description("Whether messages should be decoded into normal JSON rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. When true, union values are unwrapped (bare values instead of {\"type\": value} wrappers).").
5758
Advanced().
5859
Default(false),
5960
)
@@ -79,24 +80,22 @@ type avroScannerCreator struct {
7980
}
8081

8182
func (c *avroScannerCreator) Create(rdr io.ReadCloser, aFn service.AckFunc, _ *service.ScannerSourceDetails) (service.BatchScanner, error) {
82-
br := bufio.NewReader(rdr)
83-
ocf, err := goavro.NewOCFReader(br)
83+
reader, err := ocf.NewReader(rdr)
8484
if err != nil {
8585
return nil, err
8686
}
8787

88-
ocfCodec := ocf.Codec()
89-
ocfSchema := ocfCodec.Schema()
90-
if c.rawJSON {
91-
if ocfCodec, err = goavro.NewCodecForStandardJSONFull(ocfSchema); err != nil {
92-
return nil, err
93-
}
94-
}
88+
schema := reader.Schema()
89+
canonical := string(schema.Canonical())
90+
fp := schema.Fingerprint(avro.NewRabin())
9591

9692
return service.AutoAggregateBatchScannerAcks(&avroScanner{
9793
r: rdr,
98-
ocf: ocf,
99-
avroCodec: ocfCodec,
94+
reader: reader,
95+
schema: schema,
96+
rawJSON: c.rawJSON,
97+
canonical: canonical,
98+
fp: fp,
10099
}, aFn), nil
101100
}
102101

@@ -106,41 +105,48 @@ func (*avroScannerCreator) Close(context.Context) error {
106105

107106
type avroScanner struct {
108107
r io.ReadCloser
109-
ocf *goavro.OCFReader
110-
avroCodec *goavro.Codec
108+
reader *ocf.Reader
109+
schema *avro.Schema
110+
rawJSON bool
111+
canonical string
112+
fp []byte
111113
}
112114

113115
func (c *avroScanner) NextBatch(context.Context) (service.MessageBatch, error) {
114116
if c.r == nil {
115117
return nil, io.EOF
116118
}
117119

118-
if !c.ocf.Scan() {
119-
err := c.ocf.Err()
120-
if err != nil {
121-
return nil, fmt.Errorf("scanning OCF file: %s", err)
120+
var native any
121+
if err := c.reader.Decode(&native); err != nil {
122+
if err == io.EOF {
123+
return nil, io.EOF
122124
}
123-
return nil, io.EOF
124-
}
125-
126-
datum, err := c.ocf.Read()
127-
if err != nil {
128125
return nil, fmt.Errorf("reading OCF datum: %s", err)
129126
}
130127

131-
jb, err := c.avroCodec.TextualFromNative(nil, datum)
128+
var jb []byte
129+
var err error
130+
if c.rawJSON {
131+
jb, err = c.schema.EncodeJSON(native)
132+
} else {
133+
jb, err = c.schema.EncodeJSON(native, avro.TaggedUnions(), avro.TagLogicalTypes())
134+
}
132135
if err != nil {
133-
return nil, fmt.Errorf("decoding OCF datum to JSON: %s", err)
136+
return nil, fmt.Errorf("encoding OCF datum to JSON: %s", err)
134137
}
138+
135139
msg := service.NewMessage(jb)
136-
msg.MetaSetMut("avro_schema", c.avroCodec.CanonicalSchema())
137-
msg.MetaSetMut("avro_schema_fingerprint", c.avroCodec.Rabin)
140+
msg.MetaSetMut("avro_schema", c.canonical)
141+
msg.MetaSetMut("avro_schema_fingerprint", c.fp)
138142
return service.MessageBatch{msg}, nil
139143
}
140144

141145
func (c *avroScanner) Close(context.Context) error {
142146
if c.r == nil {
143147
return nil
144148
}
145-
return c.r.Close()
149+
readerErr := c.reader.Close() // closes codec only, not the underlying reader
150+
rErr := c.r.Close()
151+
return errors.Join(readerErr, rErr)
146152
}

0 commit comments

Comments
 (0)