Skip to content

Commit 757dbcd

Browse files
josephwoodwardJeffail
authored andcommitted
iceberg: fix schema evolution producing non-lowercase column names
1 parent 071d3e5 commit 757dbcd

2 files changed

Lines changed: 40 additions & 0 deletions

File tree

internal/impl/iceberg/writer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,9 @@ func (s *parquetSink) EmitValue(sv shredder.ShreddedValue) error {
359359
}
360360

361361
func (s *parquetSink) OnNewField(parentPath icebergx.Path, name string, value any) {
362+
if !s.caseSensitive {
363+
name = strings.ToLower(name)
364+
}
362365
fe := NewUnknownFieldError(parentPath, name, value)
363366
key := dedupKey(fe.FullPath().String(), s.caseSensitive)
364367
if _, ok := s.seenFields[key]; ok {
@@ -443,6 +446,9 @@ func (s *bufferingSink) EmitValue(sv shredder.ShreddedValue) error {
443446
}
444447

445448
func (s *bufferingSink) OnNewField(parentPath icebergx.Path, name string, value any) {
449+
if !s.caseSensitive {
450+
name = strings.ToLower(name)
451+
}
446452
fe := NewUnknownFieldError(parentPath, name, value)
447453
key := dedupKey(fe.FullPath().String(), s.caseSensitive)
448454
if _, ok := s.seenFields[key]; ok {

internal/impl/iceberg/writer_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,40 @@ func TestBufferingSinkNewFieldDedup(t *testing.T) {
5555
})
5656
}
5757

58+
// TestSchemaEvolutionColumnNameNormalization verifies that when
59+
// case_sensitive_columns=false, new column names are normalised to lowercase
60+
// before being stored in the UnknownFieldError that drives AddColumn. Without
61+
// this, a message keyed "EMAIL" would add an uppercase column to an otherwise
62+
// lowercase schema, violating iceberg's recommended convention.
63+
func TestSchemaEvolutionColumnNameNormalization(t *testing.T) {
64+
t.Run("buffering sink normalises to lowercase", func(t *testing.T) {
65+
sink := newBufferingSink(nil, 0, false)
66+
sink.OnNewField(icebergx.Path{}, "EMAIL", "a@x.z")
67+
68+
errs := sink.newFieldErrors()
69+
require.Len(t, errs, 1)
70+
assert.Equal(t, "email", errs[0].FieldName())
71+
})
72+
73+
t.Run("parquet sink normalises to lowercase", func(t *testing.T) {
74+
sink := &parquetSink{caseSensitive: false}
75+
sink.OnNewField(icebergx.Path{}, "EMAIL", "a@x.z")
76+
77+
errs := sink.newFieldErrors()
78+
require.Len(t, errs, 1)
79+
assert.Equal(t, "email", errs[0].FieldName())
80+
})
81+
82+
t.Run("case-sensitive mode preserves original capitalisation", func(t *testing.T) {
83+
sink := newBufferingSink(nil, 0, true)
84+
sink.OnNewField(icebergx.Path{}, "EMAIL", "a@x.z")
85+
86+
errs := sink.newFieldErrors()
87+
require.Len(t, errs, 1)
88+
assert.Equal(t, "EMAIL", errs[0].FieldName())
89+
})
90+
}
91+
5892
// TestParquetSinkNewFieldDedup mirrors TestBufferingSinkNewFieldDedup for the
5993
// non-buffering parquet sink, which is used for unpartitioned writes.
6094
func TestParquetSinkNewFieldDedup(t *testing.T) {

0 commit comments

Comments
 (0)