Skip to content

Commit e558a59

Browse files
fix(iceberg): apply schema_metadata for nil fields during table creation (#4383)
1 parent a4bad81 commit e558a59

3 files changed

Lines changed: 119 additions & 8 deletions

File tree

internal/impl/iceberg/router.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ func (r *Router) buildSchemaWithResolver(record map[string]any, msg *service.Mes
416416
for _, metaName := range orderedMetaNames {
417417
recordKey, ok := matchRecordKey(record, metaName, r.caseSensitive)
418418
if !ok {
419+
// Field declared in schema metadata but absent from this record.
420+
// Include it so resolveTypeForCreateTable can apply the metadata type.
421+
finalOrder = append(finalOrder, orderedField{emitName: metaName, metadataOnly: true})
419422
continue
420423
}
421424
if _, seen := used[recordKey]; seen {
@@ -432,7 +435,10 @@ func (r *Router) buildSchemaWithResolver(record map[string]any, msg *service.Mes
432435
}
433436

434437
for _, f := range finalOrder {
435-
value := record[f.recordKey]
438+
var value any
439+
if !f.metadataOnly {
440+
value = record[f.recordKey]
441+
}
436442
fieldType, err := r.resolver.resolveTypeForCreateTable(f.emitName, value, msg, common, key.namespace, key.table, ti)
437443
if err != nil {
438444
return nil, fmt.Errorf("resolving type for field %q: %w", f.emitName, err)
@@ -455,10 +461,13 @@ func (r *Router) buildSchemaWithResolver(record map[string]any, msg *service.Mes
455461
// name that should land on the iceberg column. They differ in case-insensitive
456462
// mode when a metadata field matches a record key with different casing — the
457463
// metadata casing wins for the column, the record casing is needed to find the
458-
// value.
464+
// value. When metadataOnly is true the field was declared in schema metadata but
465+
// is absent from the record; value is treated as nil and the column type comes
466+
// entirely from the metadata.
459467
type orderedField struct {
460-
recordKey string
461-
emitName string
468+
recordKey string
469+
emitName string
470+
metadataOnly bool
462471
}
463472

464473
// matchRecordKey returns the actual key from record matching name, preserving

internal/impl/iceberg/router_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,3 +325,104 @@ func TestBuildSchemaWithResolverNewColumnTypeMappingSeesMetadataCasing(t *testin
325325
assert.Equal(t, "long", fields[0].Type.Type(),
326326
"mapping returns long only when name+path match metadata casing; any other casing means the rename path is broken")
327327
}
328+
329+
// TestBuildSchemaWithResolverUsesMetadataTypeForNilValue verifies that
330+
// table-creation type resolution still applies schema metadata when the
331+
// observed record value is nil. Without this, nil values would be skipped
332+
// before metadata lookup and columns would appear later only after a non-nil
333+
// value is seen.
334+
func TestBuildSchemaWithResolverUsesMetadataTypeForNilValue(t *testing.T) {
335+
router := &Router{
336+
caseSensitive: true,
337+
resolver: newTypeResolver("schema_key", nil, true, nil),
338+
}
339+
340+
schemaMeta := schema.Common{
341+
Type: schema.Object,
342+
Children: []schema.Common{
343+
{Name: "count", Type: schema.Int32},
344+
},
345+
}
346+
347+
msg := service.NewMessage(nil)
348+
msg.MetaSetMut("schema_key", schemaMeta.ToAny())
349+
350+
record := map[string]any{
351+
"count": nil,
352+
}
353+
354+
icebergSchema, err := router.buildSchemaWithResolver(record, msg, tableKey{namespace: "ns", table: "t"})
355+
require.NoError(t, err)
356+
357+
fields := icebergSchema.Fields()
358+
require.Len(t, fields, 1)
359+
assert.Equal(t, "count", fields[0].Name)
360+
assert.Equal(t, "int", fields[0].Type.Type())
361+
}
362+
363+
// TestBuildSchemaWithResolverUsesMetadataTypeForAbsentField verifies that a
364+
// field declared in schema metadata but entirely absent from the record (not
365+
// even present as nil) is still created as an Iceberg column using the metadata
366+
// type.
367+
func TestBuildSchemaWithResolverUsesMetadataTypeForAbsentField(t *testing.T) {
368+
router := &Router{
369+
caseSensitive: true,
370+
resolver: newTypeResolver("schema_key", nil, true, nil),
371+
}
372+
373+
schemaMeta := schema.Common{
374+
Type: schema.Object,
375+
Children: []schema.Common{
376+
{Name: "count", Type: schema.Int32},
377+
},
378+
}
379+
380+
msg := service.NewMessage(nil)
381+
msg.MetaSetMut("schema_key", schemaMeta.ToAny())
382+
383+
record := map[string]any{} // "count" is entirely absent
384+
385+
icebergSchema, err := router.buildSchemaWithResolver(record, msg, tableKey{namespace: "ns", table: "t"})
386+
require.NoError(t, err)
387+
388+
fields := icebergSchema.Fields()
389+
require.Len(t, fields, 1)
390+
assert.Equal(t, "count", fields[0].Name)
391+
assert.Equal(t, "int", fields[0].Type.Type())
392+
}
393+
394+
// TestBuildSchemaWithResolverMetadataOnlyFieldOrdering verifies that
395+
// metadata-only fields (absent from the record) appear first in the schema in
396+
// metadata declaration order, followed by record-only fields in sorted order.
397+
func TestBuildSchemaWithResolverMetadataOnlyFieldOrdering(t *testing.T) {
398+
router := &Router{
399+
caseSensitive: true,
400+
resolver: newTypeResolver("schema_key", nil, true, nil),
401+
}
402+
403+
schemaMeta := schema.Common{
404+
Type: schema.Object,
405+
Children: []schema.Common{
406+
{Name: "b", Type: schema.Int32},
407+
{Name: "a", Type: schema.String},
408+
},
409+
}
410+
411+
msg := service.NewMessage(nil)
412+
msg.MetaSetMut("schema_key", schemaMeta.ToAny())
413+
414+
// "extra" is in the record but not in metadata; "b" and "a" are in metadata
415+
// but absent from the record.
416+
record := map[string]any{
417+
"extra": "hello",
418+
}
419+
420+
icebergSchema, err := router.buildSchemaWithResolver(record, msg, tableKey{namespace: "ns", table: "t"})
421+
require.NoError(t, err)
422+
423+
fields := icebergSchema.Fields()
424+
require.Len(t, fields, 3)
425+
assert.Equal(t, "b", fields[0].Name) // metadata order first
426+
assert.Equal(t, "a", fields[1].Name)
427+
assert.Equal(t, "extra", fields[2].Name) // record-only field last
428+
}

internal/impl/iceberg/type_resolver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,19 @@ func (r *typeResolver) resolveTypeForCreateTable(
103103
if err != nil {
104104
return nil, err
105105
}
106-
if inferredType == nil {
107-
return nil, nil // nil value, skip
108-
}
109106

110107
// Stage 2: schema_metadata override (uses shared ti so override structs share the ID space)
111108
path := icebergx.Path{{Kind: icebergx.PathField, Name: fieldName}}
112109
if metaType, err := r.resolveFromCommon(common, path, ti); err != nil {
113110
return nil, fmt.Errorf("resolving type from schema metadata for field %q: %w", fieldName, err)
114111
} else if metaType != nil {
115-
// If the metatype was not found then just stick with the default inferred type
112+
// Metadata takes precedence over runtime inference. This also ensures
113+
// metadata-declared columns are created even when the observed value is nil.
116114
inferredType = metaType
117115
}
116+
if inferredType == nil {
117+
return nil, nil // nil value with no metadata type, skip
118+
}
118119

119120
// Stage 3: Bloblang mapping override (only for primitive/leaf types)
120121
if r.newColumnTypeMapping != nil && isPrimitiveType(inferredType) {

0 commit comments

Comments
 (0)