Skip to content

Commit 453c95f

Browse files
twmbmmatczuk
authored andcommitted
confluent,avro,salesforce: replace linkedin/goavro and hamba/avro with twmb/avro
Remove all linkedin/goavro and hamba/avro imports from the confluent, avro, and salesforce packages. All Avro encoding and decoding now uses github.com/twmb/avro. --- Schema registry encoder --- Delete normalize_for_avro_schema.go (572 lines). The old encoder path was: AsStructuredMut → normalizeForAvroSchema → goavro.BinaryFromNative. The normalize shim walked the parsed JSON schema tree to coerce types from json.Unmarshal output (float64 for ints, string for bytes, bare values for unions, float64 for timestamps) into the types goavro expected. The new path is: AsStructuredMut → schema.Encode. twmb/avro's Encode handles all these input types natively. The old code had separate codecs for avroRawJSON mode (goavro.NewCodecForStandardJSONFull) vs Avro JSON mode (goavro.NewCodec). twmb/avro's Encode accepts both bare values and tagged union maps, so both modes use the same encoder path. --- Schema registry decoder (default mode) --- Old: goavro.NativeFromBinary → goavro.TextualFromNative → SetBytes. New: schema.Decode → schema.EncodeJSON → SetBytes. EncodeJSON calls now include avro.LinkedinFloats() to match goavro's JSON encoding of NaN (null), +Inf/−Inf (1e999/−1e999), and −0 (−0). Without LinkedinFloats, twmb/avro uses RFC-compliant string representations ("NaN", "Infinity") which would change user output. EncodeJSON produces schema-ordered fields (cosmetic difference from goavro's hash-map-ordered output). Uses the same shorthand JSON escapes as goavro (\b, \t, \n, \f, \r). --- Schema registry decoder (preserve_logical_types mode) --- Old: hamba/avro Reader.ReadNext → avroSchemaWalker.walk → SetStructuredMut. New: schema.Decode(opts) → SetStructuredMut. The old hamba walker manually traversed the decoded tree, converting types and wrapping/unwrapping unions. The new code uses twmb/avro's TaggedUnions and TagLogicalTypes decode options, plus CustomType registrations in avro_walker.go for connect-specific conversions: - time-millis/time-micros: int→time.Duration (built-in) → time.Time (CustomType). The old hamba walker asserted time.Duration from ReadNext, converted to time.Time. The new code registers CustomType that converts the raw int32/int64 directly to time.Time. - duration: The old hamba walker had a latent crash — ReadNext returned [12]uint8 for fixed fields, but the walker asserted time.Duration. The new code registers a CustomType that converts []byte (the raw fixed bytes) to an ISO 8601 duration string via avro.DurationFromBytes. - Kafka Connect (Debezium) types: Same behavior, now implemented as a CustomType checking connect.name schema property instead of walking PropertySchema. Supports the same types: Date, Year, Timestamp, Time, MicroTimestamp, MicroTime, NanoTimestamp, NanoTime, ZonedTimestamp. --- Avro processor (to_json) --- Old: goavro.NativeFromTextual/NativeFromBinary/NativeFromSingle → SetStructuredMut. New: schema.DecodeJSON/Decode/DecodeSingleObject → SetStructuredMut. Decode options include TaggedUnions and TagLogicalTypes so unions are wrapped in map[string]any and logical types produce enriched Go types (time.Time, time.Duration, json.Number, avro.Duration). Removed jsonNativeOpts() — 40 lines of CustomType registrations that suppressed built-in logical type handlers to keep raw int32/int64/[]byte values. These were added in the initial twmb/avro port to exactly match goavro's NativeFromBinary output types, but are unnecessary: the enriched types from TagLogicalTypes (time.Time for timestamps, time.Duration for time-of-day, json.Number for decimals) are strictly more useful than raw integers when set as structured data, and json.Marshal handles all of them correctly. --- Avro processor (from_json) --- Textual encoding changed from AsBytes→DecodeJSON→EncodeJSON to AsStructured→EncodeJSON. Both old goavro and new code use AsStructured for this path. EncodeJSON now includes LinkedinFloats for NaN/Inf compat. Binary and single encoding changed from AsStructured→goavro.BinaryFromNative to AsStructured→schema.Encode/AppendSingleObject. --- Avro scanner --- Old: goavro.OCFReader → goavro.Codec.TextualFromNative → SetBytes. New: ocf.Reader → schema.Decode → schema.EncodeJSON → SetBytes. Scanner fingerprint field changed from []byte to uint64 to match the goavro Rabin field type. EncodeJSON calls include LinkedinFloats. Scanner Close now also calls reader.Close() which closes the OCF codec (decompressor), in addition to closing the underlying io.ReadCloser. The old code only closed the ReadCloser. --- Salesforce --- Old: goavro.Codec.NativeFromBinary → type-assert map[string]any. New: schema.Decode(&result, TaggedUnions()) → map[string]any. TaggedUnions added to match goavro's default behavior of wrapping union values in map[string]any{"type": value}. Without this, twmb/avro returns bare values for unions, which would break extractCDCFields in client.go. --- File renames --- serde_goavro.go → serde_avro.go serde_hamba_avro.go → avro_walker.go useHamba → preserveLogicalTypes --- User-visible behavioral differences --- 1. Scanner @avro_schema metadata: For schemas with logical types, the canonical form differs. goavro retained object wrappers like {"type":"long"} after stripping logicalType; twmb/avro correctly reduces to "long" per the Avro Parsing Canonical Form spec (§4.1.5). The @avro_schema_fingerprint (Rabin) also differs for these schemas. Schemas without logical types produce identical canonical forms and fingerprints. 2. Avro processor to_json binary/single with duration logical type: goavro returned []byte (raw 12 bytes), json.Marshal produces base64. twmb/avro returns avro.Duration struct, json.Marshal produces {"Months":N,"Days":N,"Milliseconds":N}. This only affects schemas with the rare duration logical type. 3. Avro processor from_json textual rejects int32 overflow: goavro's TextualFromNative silently emitted out-of-range values for int fields. twmb/avro's EncodeJSON returns an error. This is spec-correct. 4. Avro processor from_json textual accepts RFC 3339 timestamp strings: twmb/avro's EncodeJSON accepts "2025-03-19T12:00:00Z" for timestamp-millis/micros fields. goavro required integer values only. 5. Decimal decode: goavro returned *big.Rat, twmb/avro returns json.Number. In JSON output, goavro's TextualFromNative produced escaped raw bytes (e.g. "\u0001:" for 3.14) — a goavro bug where legacy decimal encoding routed two's-complement bytes through the Avro bytes textual encoder. twmb/avro produces the proper number (3.14). This affects both the avro processor to_json and the schema registry decoder default mode. 6. Fixed/bytes in preserve_logical_types structured output: now base64 strings instead of integer arrays. Affects Bloblang expressions accessing fixed field values as arrays. 7. EncodeJSON field ordering: twmb/avro produces schema-ordered fields. goavro used Go map iteration order. Semantically identical JSON. 8. Encoder is more permissive: accepts json.Number for numeric fields, tagged union maps in raw_json mode, RFC 3339 strings for timestamps. Inputs that previously errored now work. Subsumes PR #4124 (Fix avro timestamp encoding): twmb/avro's Encode natively accepts RFC 3339 strings for timestamp fields. Error messages differ in wording but all invalid inputs are still rejected.
1 parent f4e70f3 commit 453c95f

21 files changed

Lines changed: 362 additions & 1814 deletions

docs/modules/components/pages/processors/schema_registry_decode.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ For example, the union schema `["null","string","Foo"]`, where `Foo` is a record
134134
- the string `"a"` as `{"string": "a"}`; and
135135
- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
136136
137-
However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field <<avro_raw_json, `avro_raw_json`>> to `true`.
137+
However, it is possible to instead create documents in standard/raw JSON format by setting the field <<avro_raw_json, `avro_raw_json`>> to `true`.
138138
139139
== Protobuf format
140140

docs/modules/components/pages/processors/schema_registry_encode.adoc

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,10 @@ For example, the union schema `["null","string","Foo"]`, where `Foo` is a record
116116
- the string `"a"` as `\{"string": "a"}`; and
117117
- a `Foo` instance as `\{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
118118
119-
However, it is possible to instead consume documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting `avro.raw_json` to `true`. This is strongly recommended when using `schema_metadata` mode, as CDC sources emit standard JSON rather than Avro JSON.
119+
However, it is possible to instead consume documents in standard/raw JSON format by setting `avro.raw_json` to `true`. This is strongly recommended when using `schema_metadata` mode, as CDC sources emit standard JSON rather than Avro JSON.
120120
121121
NOTE: The top-level `avro_raw_json` field is deprecated in favor of `avro.raw_json`.
122122
123-
=== Known issues
124-
125-
Important! There is an outstanding issue in the https://github.com/linkedin/goavro[avro serializing library^] that Redpanda Connect uses which means it https://github.com/linkedin/goavro/issues/252[doesn't encode logical types correctly^]. It's still possible to encode logical types that are in-line with the spec if `avro.raw_json` is set to true, though now of course non-logical types will not be in-line with the spec.
126-
127123
== Protobuf format
128124
129125
This processor encodes protobuf messages either from any format parsed within Redpanda Connect (encoded as JSON by default), or from raw JSON documents, you can read more about JSON mapping of protobuf messages here: https://developers.google.com/protocol-buffers/docs/proto3#json

docs/modules/components/pages/scanners/avro.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ For example, the union schema `["null","string","Foo"]`, where `Foo` is a record
6363
- the string `"a"` as `{"string": "a"}`; and
6464
- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
6565
66-
However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field <<avro_raw_json,`avro_raw_json`>> to `true`.
66+
However, it is possible to instead create documents in standard/raw JSON format by setting the field <<avro_raw_json,`avro_raw_json`>> to `true`.
6767
6868
This scanner also emits the canonical Avro schema as `@avro_schema` metadata, along with the schema's fingerprint available via `@avro_schema_fingerprint`.
6969
@@ -72,7 +72,7 @@ This scanner also emits the canonical Avro schema as `@avro_schema` metadata, al
7272
7373
=== `raw_json`
7474
75-
Whether messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.
75+
Whether messages should be decoded into normal JSON rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. When true, union values are unwrapped (bare values instead of {"type": value} wrappers).
7676
7777
7878
*Type*: `bool`

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ require (
113113
github.com/jackc/pgx/v5 v5.9.1
114114
github.com/jhump/protoreflect v1.18.0
115115
github.com/lib/pq v1.12.0
116-
github.com/linkedin/goavro/v2 v2.15.0
117116
github.com/matoous/go-nanoid/v2 v2.1.0
118117
github.com/microcosm-cc/bluemonday v1.0.27
119118
github.com/microsoft/go-mssqldb v1.9.8
@@ -170,6 +169,7 @@ require (
170169
github.com/timeplus-io/proton-go-driver/v2 v2.1.4
171170
github.com/tmc/langchaingo v0.1.14
172171
github.com/trinodb/trino-go-client v0.333.0
172+
github.com/twmb/avro v1.3.4
173173
github.com/twmb/franz-go v1.20.7
174174
github.com/twmb/franz-go/pkg/kadm v1.17.2
175175
github.com/twmb/franz-go/pkg/kmsg v1.12.0
@@ -279,6 +279,7 @@ require (
279279
github.com/knadh/koanf/providers/file v1.2.1 // indirect
280280
github.com/knadh/koanf/providers/rawbytes v1.0.0 // indirect
281281
github.com/knadh/koanf/v2 v2.3.3 // indirect
282+
github.com/linkedin/goavro/v2 v2.15.0 // indirect
282283
github.com/lithammer/fuzzysearch v1.1.8 // indirect
283284
github.com/mattn/go-runewidth v0.0.21 // indirect
284285
github.com/mdelapenya/tlscert v0.2.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1735,6 +1735,8 @@ github.com/trivago/grok v1.0.0/go.mod h1:9t59xLInhrncYq9a3J7488NgiBZi5y5yC7bss+w
17351735
github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM=
17361736
github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc=
17371737
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
1738+
github.com/twmb/avro v1.3.4 h1:4tTV207HOUHKTdAMv6fGPyqgwmGfMLUfnFg5R4cfIX0=
1739+
github.com/twmb/avro v1.3.4/go.mod h1:TUQS96Ptl8tDRyK0Jw91FXIVCoPKz4sXxvUSShiG5FA=
17381740
github.com/twmb/franz-go v1.20.7 h1:P4MGSXJjjAPP3NRGPCks/Lrq+j+twWMVl1qYCVgNmWY=
17391741
github.com/twmb/franz-go v1.20.7/go.mod h1:0bRX9HZVaoueqFWhPZNi2ODnJL7DNa6mK0HeCrC2bNU=
17401742
github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8=

internal/impl/avro/processor.go

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"net/http"
2323
"strings"
2424

25-
"github.com/linkedin/goavro/v2"
25+
"github.com/twmb/avro"
2626

2727
"github.com/redpanda-data/benthos/v4/public/service"
2828
)
@@ -65,19 +65,22 @@ func init() {
6565

6666
type avroOperator func(part *service.Message) error
6767

68-
func newAvroToJSONOperator(encoding string, codec *goavro.Codec) (avroOperator, error) {
68+
func newAvroToJSONOperator(encoding string, schema *avro.Schema) (avroOperator, error) {
69+
decodeOpts := []avro.Opt{avro.TaggedUnions(), avro.TagLogicalTypes()}
6970
switch encoding {
7071
case "textual":
72+
// Input is Avro JSON bytes. Decode validates against the schema
73+
// and produces structured data with tagged unions.
7174
return func(part *service.Message) error {
7275
pBytes, err := part.AsBytes()
7376
if err != nil {
7477
return err
7578
}
76-
jObj, _, err := codec.NativeFromTextual(pBytes)
77-
if err != nil {
79+
var native any
80+
if err := schema.DecodeJSON(pBytes, &native, decodeOpts...); err != nil {
7881
return fmt.Errorf("converting Avro document to JSON: %v", err)
7982
}
80-
part.SetStructuredMut(jObj)
83+
part.SetStructuredMut(native)
8184
return nil
8285
}, nil
8386
case "binary":
@@ -86,11 +89,11 @@ func newAvroToJSONOperator(encoding string, codec *goavro.Codec) (avroOperator,
8689
if err != nil {
8790
return err
8891
}
89-
jObj, _, err := codec.NativeFromBinary(pBytes)
90-
if err != nil {
92+
var native any
93+
if _, err := schema.Decode(pBytes, &native, decodeOpts...); err != nil {
9194
return fmt.Errorf("converting Avro document to JSON: %v", err)
9295
}
93-
part.SetStructuredMut(jObj)
96+
part.SetStructuredMut(native)
9497
return nil
9598
}, nil
9699
case "single":
@@ -99,53 +102,55 @@ func newAvroToJSONOperator(encoding string, codec *goavro.Codec) (avroOperator,
99102
if err != nil {
100103
return err
101104
}
102-
jObj, _, err := codec.NativeFromSingle(pBytes)
103-
if err != nil {
105+
var native any
106+
if _, err := schema.DecodeSingleObject(pBytes, &native, decodeOpts...); err != nil {
104107
return fmt.Errorf("converting Avro document to JSON: %v", err)
105108
}
106-
part.SetStructuredMut(jObj)
109+
part.SetStructuredMut(native)
107110
return nil
108111
}, nil
109112
}
110113
return nil, fmt.Errorf("encoding '%v' not recognised", encoding)
111114
}
112115

113-
func newAvroFromJSONOperator(encoding string, codec *goavro.Codec) (avroOperator, error) {
116+
func newAvroFromJSONOperator(encoding string, schema *avro.Schema) (avroOperator, error) {
114117
switch encoding {
115118
case "textual":
116119
return func(part *service.Message) error {
117-
jObj, err := part.AsStructured()
120+
data, err := part.AsStructured()
118121
if err != nil {
119122
return fmt.Errorf("parsing message as JSON: %v", err)
120123
}
121-
var textual []byte
122-
if textual, err = codec.TextualFromNative(nil, jObj); err != nil {
124+
textual, err := schema.EncodeJSON(data, avro.TaggedUnions(), avro.TagLogicalTypes(), avro.LinkedinFloats())
125+
if err != nil {
123126
return fmt.Errorf("converting JSON to Avro schema: %v", err)
124127
}
125128
part.SetBytes(textual)
126129
return nil
127130
}, nil
128131
case "binary":
132+
// Encode accepts both bare and tagged union maps, so
133+
// structured data from JSON input encodes directly.
129134
return func(part *service.Message) error {
130-
jObj, err := part.AsStructured()
135+
data, err := part.AsStructured()
131136
if err != nil {
132-
return fmt.Errorf("parsing message as JSON: %v", err)
137+
return fmt.Errorf("parsing message as structured: %v", err)
133138
}
134-
var binary []byte
135-
if binary, err = codec.BinaryFromNative(nil, jObj); err != nil {
139+
binary, err := schema.Encode(data)
140+
if err != nil {
136141
return fmt.Errorf("converting JSON to Avro schema: %v", err)
137142
}
138143
part.SetBytes(binary)
139144
return nil
140145
}, nil
141146
case "single":
142147
return func(part *service.Message) error {
143-
jObj, err := part.AsStructured()
148+
data, err := part.AsStructured()
144149
if err != nil {
145-
return fmt.Errorf("parsing message as JSON: %v", err)
150+
return fmt.Errorf("parsing message as structured: %v", err)
146151
}
147-
var single []byte
148-
if single, err = codec.SingleFromNative(nil, jObj); err != nil {
152+
single, err := schema.AppendSingleObject(nil, data)
153+
if err != nil {
149154
return fmt.Errorf("converting JSON to Avro schema: %v", err)
150155
}
151156
part.SetBytes(single)
@@ -155,12 +160,12 @@ func newAvroFromJSONOperator(encoding string, codec *goavro.Codec) (avroOperator
155160
return nil, fmt.Errorf("encoding '%v' not recognised", encoding)
156161
}
157162

158-
func strToAvroOperator(opStr, encoding string, codec *goavro.Codec) (avroOperator, error) {
163+
func strToAvroOperator(opStr, encoding string, schema *avro.Schema) (avroOperator, error) {
159164
switch opStr {
160165
case "to_json":
161-
return newAvroToJSONOperator(encoding, codec)
166+
return newAvroToJSONOperator(encoding, schema)
162167
case "from_json":
163-
return newAvroFromJSONOperator(encoding, codec)
168+
return newAvroFromJSONOperator(encoding, schema)
164169
}
165170
return nil, fmt.Errorf("operator not recognised: %v", opStr)
166171
}
@@ -192,13 +197,13 @@ func loadSchema(schemaPath string) (string, error) {
192197

193198
//------------------------------------------------------------------------------
194199

195-
type avro struct {
200+
type avroProcessor struct {
196201
operator avroOperator
197202
log *service.Logger
198203
}
199204

200205
func newAvroFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
201-
a := &avro{log: mgr.Logger()}
206+
a := &avroProcessor{log: mgr.Logger()}
202207

203208
var operator, encoding, schema, schemaPath string
204209
var err error
@@ -227,20 +232,20 @@ func newAvroFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (serv
227232
return nil, errors.New("a schema must be specified with either the `schema` or `schema_path` fields")
228233
}
229234

230-
codec, err := goavro.NewCodec(schema)
235+
parsed, err := avro.Parse(schema)
231236
if err != nil {
232237
return nil, fmt.Errorf("parsing schema: %v", err)
233238
}
234239

235-
if a.operator, err = strToAvroOperator(operator, encoding, codec); err != nil {
240+
if a.operator, err = strToAvroOperator(operator, encoding, parsed); err != nil {
236241
return nil, err
237242
}
238243
return a, nil
239244
}
240245

241246
//------------------------------------------------------------------------------
242247

243-
func (p *avro) Process(_ context.Context, msg *service.Message) (service.MessageBatch, error) {
248+
func (p *avroProcessor) Process(_ context.Context, msg *service.Message) (service.MessageBatch, error) {
244249
err := p.operator(msg)
245250
if err != nil {
246251
p.log.Debugf("Operator failed: %v\n", err)
@@ -249,6 +254,6 @@ func (p *avro) Process(_ context.Context, msg *service.Message) (service.Message
249254
return service.MessageBatch{msg}, nil
250255
}
251256

252-
func (*avro) Close(context.Context) error {
257+
func (*avroProcessor) Close(context.Context) error {
253258
return nil
254259
}

internal/impl/avro/scanner.go

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
package avro
1616

1717
import (
18-
"bufio"
1918
"context"
19+
"encoding/binary"
20+
"errors"
2021
"fmt"
2122
"io"
2223

23-
"github.com/linkedin/goavro/v2"
24+
"github.com/twmb/avro"
25+
"github.com/twmb/avro/ocf"
2426

2527
"github.com/redpanda-data/benthos/v4/public/service"
2628
)
@@ -47,13 +49,13 @@ For example, the union schema ` + "`[\"null\",\"string\",\"Foo\"]`, where `Foo`"
4749
- the string ` + "`\"a\"` as `{\"string\": \"a\"}`" + `; and
4850
- a ` + "`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance.
4951
50-
However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field ` + "<<avro_raw_json,`avro_raw_json`>> to `true`" + `.
52+
However, it is possible to instead create documents in standard/raw JSON format by setting the field ` + "<<avro_raw_json,`avro_raw_json`>> to `true`" + `.
5153
5254
This scanner also emits the canonical Avro schema as ` + "`@avro_schema`" + ` metadata, along with the schema's fingerprint available via ` + "`@avro_schema_fingerprint`" + `.
5355
`).
5456
Fields(
5557
service.NewBoolField(sFieldRawJSON).
56-
Description("Whether messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.").
58+
Description("Whether messages should be decoded into normal JSON rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. When true, union values are unwrapped (bare values instead of {\"type\": value} wrappers).").
5759
Advanced().
5860
Default(false),
5961
)
@@ -79,24 +81,22 @@ type avroScannerCreator struct {
7981
}
8082

8183
func (c *avroScannerCreator) Create(rdr io.ReadCloser, aFn service.AckFunc, _ *service.ScannerSourceDetails) (service.BatchScanner, error) {
82-
br := bufio.NewReader(rdr)
83-
ocf, err := goavro.NewOCFReader(br)
84+
reader, err := ocf.NewReader(rdr)
8485
if err != nil {
8586
return nil, err
8687
}
8788

88-
ocfCodec := ocf.Codec()
89-
ocfSchema := ocfCodec.Schema()
90-
if c.rawJSON {
91-
if ocfCodec, err = goavro.NewCodecForStandardJSONFull(ocfSchema); err != nil {
92-
return nil, err
93-
}
94-
}
89+
schema := reader.Schema()
90+
canonical := string(schema.Canonical())
91+
fp := binary.BigEndian.Uint64(schema.Fingerprint(avro.NewRabin()))
9592

9693
return service.AutoAggregateBatchScannerAcks(&avroScanner{
9794
r: rdr,
98-
ocf: ocf,
99-
avroCodec: ocfCodec,
95+
reader: reader,
96+
schema: schema,
97+
rawJSON: c.rawJSON,
98+
canonical: canonical,
99+
fp: fp,
100100
}, aFn), nil
101101
}
102102

@@ -106,41 +106,48 @@ func (*avroScannerCreator) Close(context.Context) error {
106106

107107
type avroScanner struct {
108108
r io.ReadCloser
109-
ocf *goavro.OCFReader
110-
avroCodec *goavro.Codec
109+
reader *ocf.Reader
110+
schema *avro.Schema
111+
rawJSON bool
112+
canonical string
113+
fp uint64
111114
}
112115

113116
func (c *avroScanner) NextBatch(context.Context) (service.MessageBatch, error) {
114117
if c.r == nil {
115118
return nil, io.EOF
116119
}
117120

118-
if !c.ocf.Scan() {
119-
err := c.ocf.Err()
120-
if err != nil {
121-
return nil, fmt.Errorf("scanning OCF file: %s", err)
121+
var native any
122+
if err := c.reader.Decode(&native); err != nil {
123+
if err == io.EOF {
124+
return nil, io.EOF
122125
}
123-
return nil, io.EOF
124-
}
125-
126-
datum, err := c.ocf.Read()
127-
if err != nil {
128126
return nil, fmt.Errorf("reading OCF datum: %s", err)
129127
}
130128

131-
jb, err := c.avroCodec.TextualFromNative(nil, datum)
129+
var jb []byte
130+
var err error
131+
if c.rawJSON {
132+
jb, err = c.schema.EncodeJSON(native, avro.LinkedinFloats())
133+
} else {
134+
jb, err = c.schema.EncodeJSON(native, avro.TaggedUnions(), avro.TagLogicalTypes(), avro.LinkedinFloats())
135+
}
132136
if err != nil {
133-
return nil, fmt.Errorf("decoding OCF datum to JSON: %s", err)
137+
return nil, fmt.Errorf("encoding OCF datum to JSON: %s", err)
134138
}
139+
135140
msg := service.NewMessage(jb)
136-
msg.MetaSetMut("avro_schema", c.avroCodec.CanonicalSchema())
137-
msg.MetaSetMut("avro_schema_fingerprint", c.avroCodec.Rabin)
141+
msg.MetaSetMut("avro_schema", c.canonical)
142+
msg.MetaSetMut("avro_schema_fingerprint", c.fp)
138143
return service.MessageBatch{msg}, nil
139144
}
140145

141146
func (c *avroScanner) Close(context.Context) error {
142147
if c.r == nil {
143148
return nil
144149
}
145-
return c.r.Close()
150+
readerErr := c.reader.Close() // closes codec only, not the underlying reader
151+
rErr := c.r.Close()
152+
return errors.Join(readerErr, rErr)
146153
}

0 commit comments

Comments
 (0)