Skip to content

Commit 71b2945

Browse files
hash-datashubham19mayvikaxshvaibhav-datazipvishalm0509
authored
chore: staging -> master v0.3.17 (#818)
Co-authored-by: Shubham Baldava <shubham@datazip.io> Co-authored-by: vikash choudhary <vikash@datazip.io> Co-authored-by: Ankit Sharma <111491139+hash-data@users.noreply.github.com> Co-authored-by: Vaibhav <vaibhav@datazip.io> Co-authored-by: vishal-datazip <vishal@datazip.io>
2 parents a12a155 + d43b249 commit 71b2945

60 files changed

Lines changed: 1070 additions & 353 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

constants/constants.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ const (
2020
OlakeTimestamp = "_olake_timestamp"
2121
OpType = "_op_type"
2222
CdcTimestamp = "_cdc_timestamp"
23-
DBName = "_db"
2423
StringifiedData = "data"
2524
DefaultReadPreference = "secondaryPreferred"
2625
EncryptionKey = "OLAKE_ENCRYPTION_KEY"

constants/state_version.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@ package constants
1313
// * When a string cannot be parsed as a timestamp, it returns epoch time (1970-01-01)
1414
// * Used for state files created before version 1 was introduced
1515
//
16-
// - Version 1: Current format (introduced stricter validation)
16+
// - Version 1: Introduced stricter validation
1717
// * Stricter date/timestamp parsing validation
1818
// * When a string cannot be parsed as a timestamp, it will be returned as string. Earlier it was returning epoch time (1970-01-01)
1919
// * This prevents data corruption by failing fast on invalid date strings
20+
//
21+
// - Version 2: Current Version (Introduces consistent timezone handling between MySQL Full Refresh and CDC.)
22+
// * Binlog CDC now uses TimestampStringLocation to align with the connection's timezone configuration.
23+
// * This prevents discrepancies where CDC timestamps could differ from Full Refresh data.
2024

2125
const (
22-
LatestStateVersion = 1
26+
LatestStateVersion = 2
2327
)
2428

2529
// Used as the current version of the state when the program is running
26-
var LoadedStateVersion = 1
30+
var LoadedStateVersion = LatestStateVersion

destination/iceberg/arrow-writer/utils.go

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"io"
88
"sort"
99
"strconv"
10-
"time"
1110

1211
"github.com/apache/arrow-go/v18/arrow"
1312
"github.com/apache/arrow-go/v18/arrow/array"
@@ -246,30 +245,22 @@ func createPositionalDeleteArrowRecord(posDeletes []PositionalDelete, allocator
246245
return recordBuilder.NewRecord()
247246
}
248247

249-
func createArrowRecord(records []types.RawRecord, allocator memory.Allocator, schema *arrow.Schema, normalization bool, olakeTimestamp time.Time) (arrow.Record, error) {
248+
func createArrowRecord(records []types.RawRecord, allocator memory.Allocator, schema *arrow.Schema, normalization bool) (arrow.Record, error) {
250249
recordBuilder := array.NewRecordBuilder(allocator, schema)
251250
defer recordBuilder.Release()
252-
253251
for _, record := range records {
254252
for idx, field := range schema.Fields() {
255253
var val any
256-
switch field.Name {
257-
case constants.OlakeID:
258-
val = record.OlakeID
259-
case constants.OlakeTimestamp:
260-
val = olakeTimestamp
261-
case constants.OpType:
262-
val = record.OperationType
263-
case constants.CdcTimestamp:
264-
if record.CdcTimestamp != nil {
265-
val = record.CdcTimestamp
266-
}
267-
default:
268-
if normalization {
269-
val = record.Data[field.Name]
270-
} else {
271-
val = record.Data
272-
}
254+
255+
// Check OlakeColumns first (CDC columns, _olake_id, _olake_timestamp, etc.)
256+
if olakeVal, exists := record.OlakeColumns[field.Name]; exists {
257+
val = olakeVal
258+
} else if normalization {
259+
// For normalized tables, get field from Data
260+
val = record.Data[field.Name]
261+
} else if field.Name == constants.StringifiedData {
262+
// For non-normalized tables, the "data" column contains entire record.Data as JSON
263+
val = record.Data
273264
}
274265

275266
if val == nil {

destination/iceberg/arrow-writer/writer.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,9 @@ func (w *ArrowWriter) getOrCreateWriter(ctx context.Context, pKey string, values
150150
}
151151

152152
// extract partitions records and tracks deletes for upsert mode.
153-
func (w *ArrowWriter) extract(ctx context.Context, records []types.RawRecord, olakeTimestamp time.Time) error {
153+
func (w *ArrowWriter) extract(ctx context.Context, records []types.RawRecord) error {
154154
for _, rec := range records {
155-
pKey, values, err := w.getRecordPartition(rec, olakeTimestamp)
155+
pKey, values, err := w.getRecordPartition(rec, rec.OlakeColumns[constants.OlakeTimestamp].(time.Time))
156156
if err != nil {
157157
return err
158158
}
@@ -163,27 +163,28 @@ func (w *ArrowWriter) extract(ctx context.Context, records []types.RawRecord, ol
163163
}
164164

165165
writer.data = append(writer.data, rec)
166-
166+
recordOpType := rec.OlakeColumns[constants.OpType].(string)
167+
recordOlakeID := rec.OlakeColumns[constants.OlakeID].(string)
167168
// Track deletes for upsert operations (d, u, c all need delete handling)
168-
if w.upsertMode && (rec.OperationType == "d" || rec.OperationType == "u" || rec.OperationType == "c") {
169+
if w.upsertMode && (recordOpType == "d" || recordOpType == "u" || recordOpType == "c") {
169170
filePosition := writer.dataWriter.currentRowCount + int64(len(writer.data)-1)
170171

171-
if _, exists := writer.olakeIDPosition[rec.OlakeID]; !exists {
172+
if _, exists := writer.olakeIDPosition[recordOlakeID]; !exists {
172173
// first time, add to equality deletes and track position
173-
writer.equalityDeletes = append(writer.equalityDeletes, rec.OlakeID)
174-
writer.olakeIDPosition[rec.OlakeID] = PositionalDelete{
174+
writer.equalityDeletes = append(writer.equalityDeletes, recordOlakeID)
175+
writer.olakeIDPosition[recordOlakeID] = PositionalDelete{
175176
FilePath: writer.dataWriter.filePath,
176177
Position: filePosition,
177178
}
178179
} else {
179180
// duplicates, add prev position to positional deletes (n-1 logic)
180181
// the latest (nth) occurrence is kept in the map but not added to deletes
181-
prev := writer.olakeIDPosition[rec.OlakeID]
182+
prev := writer.olakeIDPosition[recordOlakeID]
182183
writer.positionalDeletes = append(writer.positionalDeletes, PositionalDelete{
183184
FilePath: prev.FilePath,
184185
Position: prev.Position,
185186
})
186-
writer.olakeIDPosition[rec.OlakeID] = PositionalDelete{
187+
writer.olakeIDPosition[recordOlakeID] = PositionalDelete{
187188
FilePath: writer.dataWriter.filePath,
188189
Position: filePosition,
189190
}
@@ -195,10 +196,9 @@ func (w *ArrowWriter) extract(ctx context.Context, records []types.RawRecord, ol
195196
}
196197

197198
func (w *ArrowWriter) Write(ctx context.Context, records []types.RawRecord) error {
198-
olakeTimestamp := time.Now().UTC() // for olake timestamp, set current timestamp
199199
var err error
200200

201-
if err := w.extract(ctx, records, olakeTimestamp); err != nil {
201+
if err := w.extract(ctx, records); err != nil {
202202
return fmt.Errorf("failed to partition data: %s", err)
203203
}
204204

@@ -237,7 +237,7 @@ func (w *ArrowWriter) Write(ctx context.Context, records []types.RawRecord) erro
237237
}
238238
}
239239

240-
record, err := createArrowRecord(writer.data, w.allocator, w.arrowSchema[fileTypeData], w.stream.NormalizationEnabled(), olakeTimestamp)
240+
record, err := createArrowRecord(writer.data, w.allocator, w.arrowSchema[fileTypeData], w.stream.NormalizationEnabled())
241241
if err != nil {
242242
return fmt.Errorf("failed to create arrow record: %s", err)
243243
}

destination/iceberg/iceberg.go

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package iceberg
33
import (
44
"context"
55
"fmt"
6+
"maps"
67
"regexp"
78
"runtime"
89
"strings"
@@ -101,7 +102,7 @@ func (i *Iceberg) Setup(ctx context.Context, stream types.StreamInterface, globa
101102
logger.Infof("Creating destination table [%s] in Iceberg database [%s] for stream [%s]", i.stream.GetDestinationTable(), i.stream.GetDestinationDatabase(&i.config.IcebergDatabase), i.stream.Name())
102103

103104
var requestPayload proto.IcebergPayload
104-
iceSchema := utils.Ternary(stream.NormalizationEnabled(), stream.Schema().ToIceberg(), icebergRawSchema()).([]*proto.IcebergPayload_SchemaField)
105+
iceSchema := stream.Schema().ToIceberg(!stream.NormalizationEnabled())
105106
requestPayload = proto.IcebergPayload{
106107
Type: proto.IcebergPayload_GET_OR_CREATE_TABLE,
107108
Metadata: &proto.IcebergPayload_Metadata{
@@ -204,7 +205,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
204205
Metadata: &proto.IcebergPayload_Metadata{
205206
ThreadId: server.serverID,
206207
DestTableName: destinationDB,
207-
Schema: icebergRawSchema(),
208+
Schema: types.GetIcebergRawSchema(),
208209
},
209210
}
210211

@@ -218,8 +219,8 @@ func (i *Iceberg) Check(ctx context.Context) error {
218219

219220
// try writing record in dest table
220221
currentTime := time.Now().UTC()
221-
protoSchema := icebergRawSchema()
222-
record := types.CreateRawRecord(destinationDB, map[string]any{"name": "olake"}, "r", &currentTime)
222+
protoSchema := types.GetIcebergRawSchema()
223+
record := types.CreateRawRecord(map[string]any{"name": "olake"}, map[string]any{constants.OlakeID: "olake", constants.OpType: "r", constants.CdcTimestamp: &currentTime})
223224
protoColumns, err := legacywriter.RawDataColumnBuffer(record, protoSchema)
224225
if err != nil {
225226
return fmt.Errorf("failed to create raw data column buffer: %s", err)
@@ -307,20 +308,12 @@ func (i *Iceberg) FlattenAndCleanData(ctx context.Context, records []types.RawRe
307308
// parallel flatten data and detect schema difference
308309
diffThreadSchema := atomic.Bool{}
309310
err := utils.Concurrent(ctx, records, runtime.GOMAXPROCS(0)*16, func(_ context.Context, record types.RawRecord, idx int) error {
310-
// set pre configured fields
311-
records[idx].Data[constants.OlakeID] = record.OlakeID
312-
records[idx].Data[constants.OlakeTimestamp] = time.Now().UTC()
313-
records[idx].Data[constants.OpType] = record.OperationType
314-
if record.CdcTimestamp != nil {
315-
records[idx].Data[constants.CdcTimestamp] = *record.CdcTimestamp
316-
}
317-
318-
flattenedRecord, err := typeutils.NewFlattener().Flatten(record.Data)
311+
flattenRecord, err := typeutils.NewFlattener().Flatten(record.Data)
319312
if err != nil {
320313
return fmt.Errorf("failed to flatten record, iceberg writer: %s", err)
321314
}
322-
records[idx].Data = flattenedRecord
323-
315+
records[idx].Data = flattenRecord
316+
maps.Copy(records[idx].Data, record.OlakeColumns)
324317
// if schema difference is not detected, detect schema difference
325318
if !diffThreadSchema.Load() {
326319
// when detectChange is true, the function does not modify schema parameter
@@ -368,7 +361,6 @@ func (i *Iceberg) EvolveSchema(ctx context.Context, globalSchema, recordsRawSche
368361
if !i.stream.NormalizationEnabled() {
369362
return i.schema, nil
370363
}
371-
372364
// cases as local thread schema has detected changes w.r.t. batch records schema
373365
// i. iceberg table already have changes (i.e. no difference with global schema), in this case
374366
// only refresh table in iceberg for this thread.
@@ -582,18 +574,6 @@ func parseSchema(schemaStr string) (map[string]string, error) {
582574
return fields, nil
583575
}
584576

585-
// returns raw schema in iceberg format
586-
func icebergRawSchema() []*proto.IcebergPayload_SchemaField {
587-
var icebergFields []*proto.IcebergPayload_SchemaField
588-
for key, typ := range types.RawSchema {
589-
icebergFields = append(icebergFields, &proto.IcebergPayload_SchemaField{
590-
IceType: typ.ToIceberg(),
591-
Key: key,
592-
})
593-
}
594-
return icebergFields
595-
}
596-
597577
func getCommonAncestorType(d1, d2 string) string {
598578
// check for cases:
599579
// d1: string d2: int -> return string

0 commit comments

Comments
 (0)