Skip to content

Commit feb0aca

Browse files
authored
chore: staging -> master v0.5.0 (#858)
Co-authored-by: Ankit Sharma <111491139+hash-data@users.noreply.github.com>
2 parents 9b1770e + 77a4c79 commit feb0aca

36 files changed

Lines changed: 1404 additions & 456 deletions

File tree

destination/iceberg/arrow-writer/writer.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package arrowwriter
33
import (
44
"bytes"
55
"context"
6+
"encoding/json"
67
"fmt"
78
"strings"
89
"time"
@@ -323,7 +324,7 @@ func (w *ArrowWriter) EvolveSchema(ctx context.Context, newSchema map[string]str
323324
}
324325

325326
// Close flushes all writers and commits files to Iceberg.
326-
func (w *ArrowWriter) Close(ctx context.Context) error {
327+
func (w *ArrowWriter) Close(ctx context.Context, finalMetadataState any) error {
327328
if err := w.completeWriters(ctx); err != nil {
328329
return fmt.Errorf("failed to close arrow writers: %s", err)
329330
}
@@ -345,6 +346,12 @@ func (w *ArrowWriter) Close(ctx context.Context) error {
345346
},
346347
}
347348

349+
// Commit payload from CDC/driver only: e.g. {"captured_cdc_pos":"0/123ABC"}
350+
if finalMetadataState != nil {
351+
payloadBytes, _ := json.Marshal(finalMetadataState)
352+
commitRequest.Metadata.Payload = string(payloadBytes)
353+
}
354+
348355
commitCtx, cancel := context.WithTimeout(ctx, constants.GRPCRequestTimeout)
349356
defer cancel()
350357

destination/iceberg/iceberg.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package iceberg
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"maps"
78
"regexp"
@@ -31,6 +32,7 @@ type Iceberg struct {
3132
server *serverInstance // Java server instance
3233
schema map[string]string // schema for current thread associated with Java writer (col -> type)
3334
writer Writer // writer instance
35+
olake2PCState *types.MetadataState // olake_2pc_state for current stream
3436

3537
// Why Schema On Thread Level?
3638
// Schema on thread level is identical to the writer instance available in the Java server.
@@ -72,7 +74,7 @@ func (i *Iceberg) NewWriter(ctx context.Context) (Writer, error) {
7274
return legacywriter.New(i.options, i.schema, i.stream, i.server), nil
7375
}
7476

75-
func (i *Iceberg) Setup(ctx context.Context, stream types.StreamInterface, globalSchema any, options *destination.Options) (any, error) {
77+
func (i *Iceberg) Setup(ctx context.Context, stream types.StreamInterface, globalSchema any, options *destination.Options) (any, *types.MetadataState, error) {
7678
i.options = options
7779
i.stream = stream
7880
i.partitionInfo = make([]internal.PartitionInfo, 0)
@@ -82,13 +84,13 @@ func (i *Iceberg) Setup(ctx context.Context, stream types.StreamInterface, globa
8284
if partitionRegex != "" {
8385
err := i.parsePartitionRegex(partitionRegex)
8486
if err != nil {
85-
return nil, fmt.Errorf("failed to parse partition regex: %s", err)
87+
return nil, nil, fmt.Errorf("failed to parse partition regex: %s", err)
8688
}
8789
}
8890

8991
server, err := newIcebergClient(i.config, i.partitionInfo, options.ThreadID, false, isUpsertMode(stream, options.Backfill), i.stream.GetDestinationDatabase(&i.config.IcebergDatabase))
9092
if err != nil {
91-
return nil, fmt.Errorf("failed to start iceberg server: %s", err)
93+
return nil, nil, fmt.Errorf("failed to start iceberg server: %s", err)
9294
}
9395

9496
// persist server details
@@ -115,20 +117,30 @@ func (i *Iceberg) Setup(ctx context.Context, stream types.StreamInterface, globa
115117

116118
response, err := i.server.SendClientRequest(ctx, &requestPayload)
117119
if err != nil {
118-
return nil, fmt.Errorf("failed to load or create table: %s", err)
120+
return nil, nil, fmt.Errorf("failed to load or create table: %s", err)
119121
}
120122

121123
ingestResponse := response.(*proto.RecordIngestResponse)
122124
schema, err = parseSchema(ingestResponse.GetResult())
123125
if err != nil {
124-
return nil, fmt.Errorf("failed to parse schema from resp[%s]: %s", ingestResponse.GetResult(), err)
126+
return nil, nil, fmt.Errorf("failed to parse schema from resp[%s]: %s", ingestResponse.GetResult(), err)
127+
}
128+
129+
// Capture optional olake_2pc state from table metadata without returning early,
130+
// so we fall through to create the writer for this thread.
131+
if olake2PCState := ingestResponse.GetOlake_2PcState(); olake2PCState != "" {
132+
var metadataState types.MetadataState
133+
if err := json.Unmarshal([]byte(olake2PCState), &metadataState); err != nil {
134+
return schema, nil, fmt.Errorf("failed to unmarshal 2pc metadata state: %s", err)
135+
}
136+
i.olake2PCState = &metadataState
125137
}
126138
} else {
127139
// set global schema for current thread
128140
var ok bool
129141
schema, ok = globalSchema.(map[string]string)
130142
if !ok {
131-
return nil, fmt.Errorf("failed to convert globalSchema of type[%T] to map[string]string", globalSchema)
143+
return nil, nil, fmt.Errorf("failed to convert globalSchema of type[%T] to map[string]string", globalSchema)
132144
}
133145
}
134146

@@ -137,19 +149,19 @@ func (i *Iceberg) Setup(ctx context.Context, stream types.StreamInterface, globa
137149

138150
writer, err := i.NewWriter(ctx)
139151
if err != nil {
140-
return nil, fmt.Errorf("failed to create iceberg writer: %v", err)
152+
return nil, nil, fmt.Errorf("failed to create iceberg writer: %v", err)
141153
}
142154
i.writer = writer
143155

144-
return schema, nil
156+
return schema, i.olake2PCState, nil
145157
}
146158

147159
// note: java server parses time from long value which will in milliseconds
148160
func (i *Iceberg) Write(ctx context.Context, records []types.RawRecord) error {
149161
return i.writer.Write(ctx, records)
150162
}
151163

152-
func (i *Iceberg) Close(ctx context.Context) error {
164+
func (i *Iceberg) Close(ctx context.Context, finalMetadataState any) error {
153165
// skip flushing on error
154166
defer func() {
155167
if i.server == nil {
@@ -171,7 +183,7 @@ func (i *Iceberg) Close(ctx context.Context) error {
171183
// skip commit in case of context cancellation
172184
return ctx.Err()
173185
default:
174-
return i.writer.Close(ctx)
186+
return i.writer.Close(ctx, finalMetadataState)
175187
}
176188
}
177189

@@ -193,7 +205,7 @@ func (i *Iceberg) Check(ctx context.Context) error {
193205
// to close client properly
194206
i.server = server
195207
defer func() {
196-
i.Close(ctx)
208+
i.Close(ctx, nil)
197209
}()
198210

199211
ctx, cancel := context.WithTimeout(ctx, 300*time.Second)
@@ -504,7 +516,7 @@ func (i *Iceberg) DropStreams(ctx context.Context, dropStreams []types.StreamInt
504516
// to close client properly
505517
i.server = server
506518
defer func() {
507-
i.Close(ctx)
519+
i.Close(ctx, nil)
508520
}()
509521

510522
logger.Infof("Starting Clear Iceberg destination for %d selected streams", len(dropStreams))

destination/iceberg/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ import (
99
type Writer interface {
1010
Write(ctx context.Context, records []types.RawRecord) error
1111
EvolveSchema(ctx context.Context, newSchema map[string]string) error
12-
Close(ctx context.Context) error
12+
Close(ctx context.Context, finalMetadataState any) error
1313
}

destination/iceberg/legacy-writer/writer.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,20 @@ func (w *LegacyWriter) EvolveSchema(_ context.Context, newSchema map[string]stri
114114
return nil
115115
}
116116

117-
func (w *LegacyWriter) Close(ctx context.Context) error {
117+
func (w *LegacyWriter) Close(ctx context.Context, finalMetadataState any) error {
118+
// Commit payload from CDC/driver only: e.g. {"captured_cdc_pos":"0/123ABC"}
119+
var payloadStr string
120+
if finalMetadataState != nil {
121+
payloadBytes, _ := json.Marshal(finalMetadataState)
122+
payloadStr = string(payloadBytes)
123+
}
124+
118125
request := &proto.IcebergPayload{
119126
Type: proto.IcebergPayload_COMMIT,
120127
Metadata: &proto.IcebergPayload_Metadata{
121128
ThreadId: w.server.ServerID(),
122129
DestTableName: w.stream.GetDestinationTable(),
130+
Payload: payloadStr,
123131
},
124132
}
125133

destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/rpc/OlakeArrowIngester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void icebergAPI(ArrowPayload request, StreamObserver<RecordIngest.ArrowIn
149149
}
150150
}
151151

152-
icebergTableOperator.commitThread(threadId, this.icebergTable);
152+
icebergTableOperator.commitThread(threadId, request.getMetadata().getPayload(), icebergTable);
153153
sendResponse(responseObserver,
154154
String.format(
155155
"Successfully committed %d data files, %d equality delete files, and %d positional delete files for thread %s",

destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
5353
String identifierField = metadata.getIdentifierField();
5454
List<IcebergPayload.SchemaField> schemaMetadata = metadata.getSchemaList();
5555

56-
if (threadId == null || threadId.isEmpty()) {
56+
if ((threadId == null || threadId.isEmpty())) {
5757
// file references are being stored through thread id
5858
throw new Exception("Thread id not present in metadata");
5959
}
@@ -73,8 +73,7 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
7373

7474
switch (request.getType()) {
7575
case COMMIT:
76-
LOGGER.info("{} Received commit request for thread: {}", requestId, threadId);
77-
icebergTableOperator.commitThread(threadId, this.icebergTable);
76+
icebergTableOperator.commitThread(threadId, metadata.getPayload(), this.icebergTable);
7877
sendResponse(responseObserver, requestId + " Successfully committed data for thread " + threadId);
7978
LOGGER.debug("{} Successfully committed data for thread: {}", requestId, threadId);
8079
break;
@@ -97,8 +96,11 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
9796
break;
9897

9998
case GET_OR_CREATE_TABLE:
100-
sendResponse(responseObserver, this.icebergTable.schema().toString());
101-
LOGGER.info("{} Successfully returned iceberg table {}", requestId, destTableName);
99+
String schemaStr = this.icebergTable.schema().toString();
100+
this.icebergTable.refresh();
101+
String getOrCreateCommitState = icebergTableOperator.getCommitState(this.icebergTable);
102+
String olake2pcResult = getOrCreateCommitState != null ? getOrCreateCommitState : "";
103+
sendResponse(responseObserver, schemaStr, olake2pcResult);
102104
break;
103105

104106
case RECORDS:
@@ -109,7 +111,7 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
109111
sendResponse(responseObserver, "successfully pushed records: " + request.getRecordsCount());
110112
LOGGER.debug("{} Successfully wrote {} records to table {}", requestId, request.getRecordsCount(), destTableName);
111113
break;
112-
114+
113115
case DROP_TABLE:
114116
String dropTable = metadata.getDestTableName();
115117
String[] parts = dropTable.split("\\.", 2);
@@ -142,9 +144,15 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
142144
}
143145

144146
private void sendResponse(StreamObserver<RecordIngest.RecordIngestResponse> responseObserver, String message) {
145-
RecordIngest.RecordIngestResponse response = RecordIngest.RecordIngestResponse.newBuilder()
146-
.setResult(message)
147-
.build();
147+
sendResponse(responseObserver, message, null);
148+
}
149+
150+
private void sendResponse(StreamObserver<RecordIngest.RecordIngestResponse> responseObserver, String message, String olake2pcState) {
151+
RecordIngest.RecordIngestResponse.Builder builder = RecordIngest.RecordIngestResponse.newBuilder().setResult(message);
152+
if (olake2pcState != null) {
153+
builder.setOlake2PcState(olake2pcState);
154+
}
155+
RecordIngest.RecordIngestResponse response = builder.build();
148156
responseObserver.onNext(response);
149157
responseObserver.onCompleted();
150158
}

0 commit comments

Comments
 (0)