Skip to content

Commit 5d37ef3

Browse files
schema: adopt Decimal and BigDecimal common types across sources and converters (#4358)
* schema: adopt Decimal and BigDecimal common types across CDC sources and converters Threads benthos's new Decimal and BigDecimal common-schema types end-to-end: - Five CDC sources (postgres, mysql, mssqlserver, oracledb, mongodb) now emit Decimal(p, s) when precision and scale are declared and BigDecimal when they are not, with values normalised to canonical decimal strings via a new internal/sqlutil canonicaliser. - Four format converters (iceberg, parquet, avro, json schema) honour Decimal natively. BigDecimal is rejected by the bounded-format encoders with an actionable error and accepted by JSON Schema as a permissive string-with-pattern. - ecs_avro detects logicalType: decimal in Avro specs and the schema_registry_decode store_schema_metadata path normalises decoded big.Rat values to canonical strings. - Shared Parquet decimal-byte helpers extracted into internal/impl/parquet/parquetdecimal so the parquet encoder and the iceberg shredder no longer carry duplicate implementations. The adoption is wired through a temporary go.mod replace directive pointing at the local benthos checkout while a tagged release is prepared; that directive is the one remaining follow-up before merge. * oracledb, mssqlserver: fix decimal value-shape regressions surfaced by integration tests Two integration-test failures pinned during PR-readiness verification: 1. Oracle bare NUMBER columns (no declared precision and scale) were routed through the Decimal canonicaliser because go-ora's *sql.ColumnType.DecimalSize() reports (precision=38, scale=255, ok=true) for them — 255 is the driver's "any-scale" sentinel. The snapshot mapper treated that as a real (p, s) and called Decimal(38, 255), producing "decimal value has 255 significant digits" errors. The oracleNumberToCommon schema mapping had the same hole. Both now treat scale > precision as undeclared and fall back to BigDecimal so the schema cache and the value mapper agree, leaving the source lossless. 2. MSSQL CDC streaming scanned DECIMAL/NUMERIC columns into *any, which go-mssqldb coerced to a lossy float64. The streaming iterator now pre-allocates *sql.NullString scan targets for DECIMAL/NUMERIC and MONEY/SMALLMONEY so the driver hands back the lossless text representation. The stream-snapshot code path in replication/snapshot.go was also still wrapping DECIMAL/NUMERIC values in json.Number from the pre-Decimal era; it now routes through sqlutil.CanonicaliseDecimal / CanonicaliseBigDecimal in line with the regular snapshot and streaming paths. Improves the snapshot mapper's error message in oracledb to include the column name and input text so future driver quirks are easier to spot, and updates the streaming-block fixture for NOLEADINGZERO_COL in the oracledb all-types integration test (previously asserted as a float64, now a canonical BigDecimal string). * chore: clear task lint and task test gates Three small fixes to bring the repo to a clean lint/test state: - internal/impl/confluent/ecs_avro.go: drop the unused ecsAvroFromBytes wrapper (callers were migrated to ecsAvroParseFromBytes during the decoder normalisation work) and replace a perfsprint-flagged fmt.Errorf("missing") with errors.New. - internal/impl/postgresql/pglogicalstream/schema_test.go: simplify the redundant "((1 << 16) | 0) + 4" atttypmod fixture to "(1 << 16) + 4" per staticcheck's SA4016. - internal/impl/tigerbeetle/integration_test.go: switch the docker container types import from "github.com/docker/docker/api/types/container" to "github.com/moby/moby/api/types/container" to match the signature testcontainers-go now expects (used elsewhere in this repo). The docker module relocated these types upstream; the existing import was a pre-existing typecheck failure that golangci-lint surfaces. * Pin main for benthos (temporary) * review: address PR feedback on ecs_avro scale default and decimal canonicaliser rounding Three review fixes: - internal/impl/confluent/ecs_avro.go: per the Avro spec scale is optional in the decimal logical type and defaults to 0 when absent. The reverse-direction reader was treating a missing scale as an error; it now returns Decimal(precision, 0) for those fields. (joseph.woodward) - internal/sqlutil/decimal.go: replace the big.Float fallback with a big.Rat parse and an exact "fits at the declared scale" check. Previously an input like "1.56789" against a NUMBER(10, 2) column silently rounded to "1.57" because the big.Float path added 0.5 and truncated; rationals represent decimals exactly, so the check is now a real precision-loss test. Inputs that lose precision at the declared scale return an error. Scientific notation, leading +, and fewer-than-scale fractional digits continue to canonicalise as before. (joseph.woodward) - License URLs in internal/sqlutil/decimal.go and decimal_test.go: drop the erroneous "/v4" segment to match the rest of the RCL headers in the repo. (claude[bot]) Adds tests for both the scale-default Avro spec and the precision-loss rejection. * address unsigned decimal in mysql * address special NUMERIC values in PostgreSQL * oracledb: update streaming integration fixtures for BigDecimal value shape TestIntegrationOracleDBCDCStreaming uses tables with bare NUMBER columns (id NUMBER GENERATED ALWAYS AS IDENTITY and val NUMBER), both of which now fall through to BigDecimal under the new schema mapping. Their values are emitted as canonical decimal strings rather than json.Number integers, so the per-subtest content assertions move from {"ID":1,"VAL":1} / {"ID":1,"VAL":2} to {"ID":"1","VAL":"1"} / {"ID":"1","VAL":"2"}. * update failing test --------- Co-authored-by: Joseph Woodward <joseph.woodward@xeuse.com>
1 parent 6872dd7 commit 5d37ef3

41 files changed

Lines changed: 2031 additions & 252 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ All notable changes to this project will be documented in this file.
2323
### Added
2424

2525
- parquet_encode: Added `default_timestamp_unit` field (values `NANOSECOND`, `MICROSECOND`, `MILLISECOND`) controlling the precision of TIMESTAMP logical types. Default remains `NANOSECOND` for backwards compatibility. Use `MICROSECOND` when writing files for Apache Spark/Databricks, AWS Athena or DuckDB, which do not support `TIMESTAMP(NANOS)`. ([#3570](https://github.com/redpanda-data/connect/issues/3570))
26+
- iceberg, parquet_encode, schema_registry_encode: Added support for the new `Decimal` and `BigDecimal` benthos common-schema types in metadata-driven encoding. Iceberg / Parquet / Avro encoders emit native fixed-precision decimal types for `Decimal`; JSON Schema emits a regex-validated string. `BigDecimal` is rejected by the bounded-format encoders with a clear error and accepted by JSON Schema as a permissive string pattern. ([@Jeffail](https://github.com/Jeffail))
27+
- schema_registry_decode: When `store_schema_metadata` is set, Avro decimal logical-type values are now normalised to canonical decimal strings to match the schema metadata's value contract. ([@Jeffail](https://github.com/Jeffail))
28+
29+
### Changed
30+
31+
- postgresql: NUMERIC and DECIMAL columns now emit `Decimal(p, s)` schema metadata when precision/scale is declared, or `BigDecimal` for unparameterised `numeric` columns. Values are emitted as canonical decimal strings (right-padded to the declared scale for `Decimal`). Previously these columns surfaced as `String` with the raw Postgres text. ([@Jeffail](https://github.com/Jeffail))
32+
- mysql_cdc: DECIMAL and NUMERIC columns now emit `Decimal(p, s)` schema metadata parsed from the column's raw type, and values are normalised to canonical decimal strings. Previously these columns surfaced as `String` with the driver's native form. ([@Jeffail](https://github.com/Jeffail))
33+
- microsoft_sql_server_cdc: DECIMAL and NUMERIC columns now emit `Decimal(p, s)` schema metadata sourced from `sql.ColumnType.DecimalSize()`, and values are normalised to canonical decimal strings. MONEY and SMALLMONEY remain typed as `String`, but their wire form is now a quoted canonical decimal string instead of a raw `json.Number`. Previously DECIMAL/NUMERIC were `json.Number` typed as `String`. ([@Jeffail](https://github.com/Jeffail))
34+
- oracledb_cdc: NUMBER columns with declared precision and scale > 0 now emit `Decimal(p, s)` schema metadata; NUMBER without `DATA_PRECISION` emits `BigDecimal`. Decimal values flow through as canonical decimal strings; integer-width NUMBER (precision ≤ 18, scale 0) continues to emit `int64`. The previous `json.Number` wrapping for NUMBER-as-String columns is gone. ([@Jeffail](https://github.com/Jeffail))
35+
- mongodb_cdc: `bson.Decimal128` and `bsonType: "decimal"` validator fields now emit `BigDecimal` schema metadata. Decimal values in document bodies are emitted as canonical decimal strings instead of the previous `{"$numberDecimal": "..."}` ExtJSON wrapper. ([@Jeffail](https://github.com/Jeffail))
2636

2737
## 4.88.0 - 2026-04-16
2838

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ require (
144144
github.com/rabbitmq/amqp091-go v1.10.0
145145
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9
146146
github.com/redis/go-redis/v9 v9.18.0
147-
github.com/redpanda-data/benthos/v4 v4.71.0
147+
github.com/redpanda-data/benthos/v4 v4.71.1-0.20260428154615-ddb2b041e022
148148
github.com/redpanda-data/common-go/authz v0.2.1-0.20260319205134-242ab3c168b8
149149
github.com/redpanda-data/common-go/license v0.0.0-20260318014216-2bbd72bde0a0
150150
github.com/redpanda-data/common-go/redpanda-otel-exporter v0.4.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,8 +1553,8 @@ github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8A
15531553
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
15541554
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
15551555
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
1556-
github.com/redpanda-data/benthos/v4 v4.71.0 h1:X6U5e6rYC2XDjU6PN2KBVs5ZkmofkMpURXnKj9hLw7s=
1557-
github.com/redpanda-data/benthos/v4 v4.71.0/go.mod h1:if/3gnj/gIz3mKIiz2MGF7gNag/gv7ak0snVxP81BM4=
1556+
github.com/redpanda-data/benthos/v4 v4.71.1-0.20260428154615-ddb2b041e022 h1:VCBdg5AShnNZaON2LuGqdR5WkIbw7DrBhkSDq5leKV4=
1557+
github.com/redpanda-data/benthos/v4 v4.71.1-0.20260428154615-ddb2b041e022/go.mod h1:if/3gnj/gIz3mKIiz2MGF7gNag/gv7ak0snVxP81BM4=
15581558
github.com/redpanda-data/common-go/authz v0.2.1-0.20260319205134-242ab3c168b8 h1:hZTIp81OUDNOTCTD0gM01b1t821pDbToU9jWnZRnd/E=
15591559
github.com/redpanda-data/common-go/authz v0.2.1-0.20260319205134-242ab3c168b8/go.mod h1:sHhzCYf64ZYUBi7snbopQl+wQaKySbFsKCvGhmSckhk=
15601560
github.com/redpanda-data/common-go/license v0.0.0-20260318014216-2bbd72bde0a0 h1:xL2THs63tUTZmTiBfBm/mrjFMrwQaHKduvgQ6gIizXg=

internal/impl/confluent/common_to_avro.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,18 @@ func commonToAvroInner(c schema.Common, recordName, namespace string, isRoot boo
7676
"type": "long",
7777
"logicalType": "timestamp-millis",
7878
}, nil
79+
case schema.Decimal:
80+
if c.Logical == nil || c.Logical.Decimal == nil {
81+
return nil, fmt.Errorf("decimal field %q missing precision/scale", c.Name)
82+
}
83+
return map[string]any{
84+
"type": "bytes",
85+
"logicalType": "decimal",
86+
"precision": c.Logical.Decimal.Precision,
87+
"scale": c.Logical.Decimal.Scale,
88+
}, nil
89+
case schema.BigDecimal:
90+
return nil, fmt.Errorf("field %q is BigDecimal which has no fixed precision/scale; cast or coerce upstream before schema_registry_encode (avro)", c.Name)
7991
case schema.Array:
8092
return commonToAvroArray(c)
8193
case schema.Map:

internal/impl/confluent/common_to_avro_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,48 @@ func TestCommonToAvroUnion(t *testing.T) {
158158
assert.Equal(t, []any{"string", "int", "null"}, got)
159159
}
160160

161+
func TestCommonToAvroDecimal(t *testing.T) {
162+
c := schema.Common{
163+
Name: "amount",
164+
Type: schema.Decimal,
165+
Logical: &schema.LogicalParams{Decimal: &schema.DecimalParams{Precision: 18, Scale: 4}},
166+
}
167+
got := avroUnmarshal(t, c, "", "").(map[string]any)
168+
assert.Equal(t, "bytes", got["type"])
169+
assert.Equal(t, "decimal", got["logicalType"])
170+
// JSON numbers come back as float64; both fields are present.
171+
assert.Equal(t, float64(18), got["precision"])
172+
assert.Equal(t, float64(4), got["scale"])
173+
}
174+
175+
func TestCommonToAvroDecimalOptionalUnion(t *testing.T) {
176+
c := schema.Common{
177+
Name: "amount",
178+
Type: schema.Decimal,
179+
Optional: true,
180+
Logical: &schema.LogicalParams{Decimal: &schema.DecimalParams{Precision: 9, Scale: 2}},
181+
}
182+
got := avroUnmarshal(t, c, "", "").([]any)
183+
require.Len(t, got, 2)
184+
assert.Equal(t, "null", got[0])
185+
inner := got[1].(map[string]any)
186+
assert.Equal(t, "decimal", inner["logicalType"])
187+
assert.Equal(t, float64(9), inner["precision"])
188+
assert.Equal(t, float64(2), inner["scale"])
189+
}
190+
191+
func TestCommonToAvroDecimalMissingLogical(t *testing.T) {
192+
_, err := commonToAvroSchema(schema.Common{Name: "amount", Type: schema.Decimal}, "", "")
193+
require.Error(t, err)
194+
assert.Contains(t, err.Error(), "missing precision/scale")
195+
}
196+
197+
func TestCommonToAvroBigDecimalRejected(t *testing.T) {
198+
_, err := commonToAvroSchema(schema.Common{Name: "amount", Type: schema.BigDecimal}, "", "")
199+
require.Error(t, err)
200+
assert.Contains(t, err.Error(), "BigDecimal")
201+
}
202+
161203
func TestSanitizeAvroName(t *testing.T) {
162204
tests := []struct {
163205
input, want string

internal/impl/confluent/common_to_json_schema.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@ func commonToJSONSchemaNode(c schema.Common) (map[string]any, error) {
5959
return commonToJSONSchemaUnion(c)
6060
case schema.Timestamp:
6161
return map[string]any{"type": "string", "format": "date-time"}, nil
62+
case schema.Decimal:
63+
if c.Logical == nil || c.Logical.Decimal == nil {
64+
return nil, fmt.Errorf("decimal field %q missing precision/scale", c.Name)
65+
}
66+
return map[string]any{
67+
"type": "string",
68+
"pattern": decimalPattern(c.Logical.Decimal.Precision, c.Logical.Decimal.Scale),
69+
}, nil
70+
case schema.BigDecimal:
71+
return map[string]any{
72+
"type": "string",
73+
"pattern": `^-?(0|[1-9][0-9]*)(\.[0-9]+)?$`,
74+
}, nil
6275
case schema.Any:
6376
return map[string]any{}, nil
6477
default:
@@ -128,3 +141,21 @@ func commonToJSONSchemaUnion(c schema.Common) (map[string]any, error) {
128141
}
129142
return map[string]any{"oneOf": oneOf}, nil
130143
}
144+
145+
// decimalPattern returns a JSON Schema regex matching the canonical decimal
146+
// string form for Decimal(p, s). The pattern enforces no leading zeros (except
147+
// a single 0 before the decimal point), no scientific notation, optional
148+
// leading minus, and exactly s fractional digits when s > 0.
149+
func decimalPattern(precision, scale int32) string {
150+
m := precision - scale
151+
switch {
152+
case scale == 0:
153+
// Integer-only: up to precision digits.
154+
return fmt.Sprintf(`^-?(0|[1-9][0-9]{0,%d})$`, m-1)
155+
case m == 0:
156+
// Fractional-only (e.g. Decimal(4,4)): integer part can only be 0.
157+
return fmt.Sprintf(`^-?0\.[0-9]{%d}$`, scale)
158+
default:
159+
return fmt.Sprintf(`^-?(0|[1-9][0-9]{0,%d})\.[0-9]{%d}$`, m-1, scale)
160+
}
161+
}

internal/impl/confluent/common_to_json_schema_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package confluent
1616

1717
import (
1818
"encoding/json"
19+
"regexp"
1920
"testing"
2021

2122
"github.com/stretchr/testify/assert"
@@ -132,3 +133,78 @@ func TestCommonToJSONSchemaUnion(t *testing.T) {
132133
assert.Equal(t, "string", oneOf[0].(map[string]any)["type"])
133134
assert.Equal(t, "integer", oneOf[1].(map[string]any)["type"])
134135
}
136+
137+
func TestCommonToJSONSchemaDecimal(t *testing.T) {
138+
cases := []struct {
139+
name string
140+
precision int32
141+
scale int32
142+
matches []string
143+
rejects []string
144+
}{
145+
{
146+
name: "scale 0 integer-only",
147+
precision: 5,
148+
scale: 0,
149+
matches: []string{"0", "1", "12345", "-12345"},
150+
rejects: []string{"123456", "1.5", "01", "+1", "1e2", ""},
151+
},
152+
{
153+
name: "scale 4 mixed",
154+
precision: 18,
155+
scale: 4,
156+
matches: []string{"0.0000", "1.5000", "-12345.6789", "12345678901234.5678"},
157+
rejects: []string{"1.5", "1.50000", "1", "01.5000", "+1.5000", "1.5e2", ""},
158+
},
159+
{
160+
name: "scale equals precision fractional only",
161+
precision: 4,
162+
scale: 4,
163+
matches: []string{"0.0000", "0.1234", "-0.9999"},
164+
rejects: []string{"1.0000", "0.123", "0.12345", "1"},
165+
},
166+
}
167+
168+
for _, tc := range cases {
169+
t.Run(tc.name, func(t *testing.T) {
170+
c := schema.Common{
171+
Name: "amount",
172+
Type: schema.Decimal,
173+
Logical: &schema.LogicalParams{Decimal: &schema.DecimalParams{Precision: tc.precision, Scale: tc.scale}},
174+
}
175+
got := jsonSchemaUnmarshal(t, c)
176+
assert.Equal(t, "string", got["type"])
177+
pattern, ok := got["pattern"].(string)
178+
require.True(t, ok, "pattern field must be a string")
179+
180+
re := regexp.MustCompile(pattern)
181+
for _, ok := range tc.matches {
182+
assert.True(t, re.MatchString(ok), "pattern %q should match %q", pattern, ok)
183+
}
184+
for _, bad := range tc.rejects {
185+
assert.False(t, re.MatchString(bad), "pattern %q should NOT match %q", pattern, bad)
186+
}
187+
})
188+
}
189+
}
190+
191+
func TestCommonToJSONSchemaDecimalMissingLogical(t *testing.T) {
192+
_, err := commonToJSONSchema(schema.Common{Name: "amount", Type: schema.Decimal})
193+
require.Error(t, err)
194+
assert.Contains(t, err.Error(), "missing precision/scale")
195+
}
196+
197+
func TestCommonToJSONSchemaBigDecimal(t *testing.T) {
198+
c := schema.Common{Name: "amount", Type: schema.BigDecimal}
199+
got := jsonSchemaUnmarshal(t, c)
200+
assert.Equal(t, "string", got["type"])
201+
pattern, ok := got["pattern"].(string)
202+
require.True(t, ok)
203+
re := regexp.MustCompile(pattern)
204+
for _, s := range []string{"0", "1.5", "-1.5", "12345.67890", "1"} {
205+
assert.True(t, re.MatchString(s), "permissive pattern should match %q", s)
206+
}
207+
for _, s := range []string{"1.5e2", "+1.5", "01", "", "1.2.3"} {
208+
assert.False(t, re.MatchString(s), "permissive pattern should NOT match %q", s)
209+
}
210+
}

0 commit comments

Comments
 (0)