Skip to content

Commit 60799a9

Browse files
authored
Merge pull request #606 from datazip-inc/staging
chore: release version v0.2.9
2 parents 45ef2d1 + 0ce8c42 commit 60799a9

File tree

44 files changed

+2668
-252
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2668
-252
lines changed

.github/workflows/security-ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
- uses: actions/setup-go@v3
3030
with:
3131
check-latest: "true"
32-
go-version: "1.23.x"
32+
go-version: "1.24.x"
3333
- name: Install govulncheck
3434
run: go install golang.org/x/vuln/cmd/govulncheck@latest
3535
- name: Run vulnerability checks

.github/workflows/unit-tests.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
name: Unit Tests
2+
3+
on:
4+
push:
5+
pull_request:
6+
7+
jobs:
8+
unit-tests:
9+
name: Run Unit Tests
10+
runs-on: ubuntu-latest
11+
steps:
12+
- uses: actions/checkout@v3
13+
14+
- uses: actions/setup-go@v3
15+
with:
16+
check-latest: true
17+
go-version: 1.24.x
18+
19+
- name: Install Dependencies
20+
run: go mod download
21+
22+
- name: Run All Unit Tests
23+
run: go test -v ./...

constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const (
2525
ConfigFolder = "CONFIG_FOLDER"
2626
StatePath = "STATE_PATH"
2727
StreamsPath = "STREAMS_PATH"
28+
DifferencePath = "DIFFERENCE_STREAMS_PATH"
2829
// DestinationDatabasePrefix is used as prefix for destination database name
2930
DestinationDatabasePrefix = "DESTINATION_DATABASE_PREFIX"
3031
// EffectiveParquetSize is the effective size in bytes considering 256mb targeted parquet size, compression ratio as 8

destination/iceberg/iceberg.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -613,17 +613,52 @@ func (i *Iceberg) parsePartitionRegex(pattern string) error {
613613
return nil
614614
}
615615

616-
func (i *Iceberg) DropStreams(_ context.Context, _ []string) error {
617-
logger.Info("iceberg destination not support clear destination, skipping clear operation")
616+
// drop streams required for clear destination
617+
func (i *Iceberg) DropStreams(ctx context.Context, dropStreams []types.StreamInterface) error {
618+
i.options = &destination.Options{
619+
ThreadID: "iceberg_destination_drop",
620+
}
621+
if len(dropStreams) == 0 {
622+
logger.Info("No streams selected for clearing Iceberg destination, skipping operation")
623+
return nil
624+
}
618625

619-
// logger.Infof("Clearing Iceberg destination for %d selected streams: %v", len(selectedStreams), selectedStreams)
626+
// server setup for dropping tables
627+
server, err := newIcebergClient(i.config, []PartitionInfo{}, i.options.ThreadID, false, false, "")
628+
if err != nil {
629+
return fmt.Errorf("failed to setup iceberg server for dropping streams: %s", err)
630+
}
620631

621-
// TODO: Implement Iceberg table clearing logic
622-
// 1. Connect to the Iceberg catalog
623-
// 2. Use Iceberg's delete API or drop/recreate the table
624-
// 3. Handle any Iceberg-specific cleanup
632+
// to close client properly
633+
i.server = server
634+
defer func() {
635+
i.Close(ctx)
636+
}()
637+
638+
logger.Infof("Starting Clear Iceberg destination for %d selected streams", len(dropStreams))
639+
640+
// process each stream
641+
for _, stream := range dropStreams {
642+
destDB := stream.GetDestinationDatabase(&i.config.IcebergDatabase)
643+
destTable := stream.GetDestinationTable()
644+
dropTable := fmt.Sprintf("%s.%s", destDB, destTable)
645+
646+
logger.Infof("Dropping Iceberg table: %s", dropTable)
647+
648+
request := proto.IcebergPayload{
649+
Type: proto.IcebergPayload_DROP_TABLE,
650+
Metadata: &proto.IcebergPayload_Metadata{
651+
DestTableName: dropTable,
652+
ThreadId: i.server.serverID,
653+
},
654+
}
655+
_, err := i.server.sendClientRequest(ctx, &request)
656+
if err != nil {
657+
return fmt.Errorf("failed to drop table %s: %s", dropTable, err)
658+
}
659+
}
625660

626-
// logger.Info("Successfully cleared Iceberg destination for selected streams")
661+
logger.Info("Successfully cleared Iceberg destination for selected streams")
627662
return nil
628663
}
629664

destination/iceberg/java_client.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,11 @@ func getServerConfigJSON(config *Config, partitionInfo []PartitionInfo, port int
114114
logger.Warnf("No region explicitly provided for Glue catalog, the Java process will attempt to use region from AWS environment")
115115
}
116116

117-
// Configure custom endpoint for S3-compatible services (like MinIO)
118117
if config.S3Endpoint != "" {
119118
serverConfig["s3.endpoint"] = config.S3Endpoint
120-
serverConfig["io-impl"] = "org.apache.iceberg.io.ResolvingFileIO"
121-
// Set SSL/TLS configuration
122-
serverConfig["s3.ssl-enabled"] = utils.Ternary(config.S3UseSSL, "true", "false").(string)
123119
}
124-
125-
// Configure S3 or GCP file IO
126-
serverConfig["io-impl"] = utils.Ternary(strings.HasPrefix(config.IcebergS3Path, "gs://"), "org.apache.iceberg.gcp.gcs.GCSFileIO", "org.apache.iceberg.aws.s3.S3FileIO")
120+
serverConfig["io-impl"] = "org.apache.iceberg.io.ResolvingFileIO"
121+
serverConfig["s3.ssl-enabled"] = utils.Ternary(config.S3UseSSL, "true", "false").(string)
127122

128123
// Marshal the config to JSON
129124
return json.Marshal(serverConfig)

destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/IcebergUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,4 +276,19 @@ public static int partitionId() {
276276
return Integer.parseInt(dtFormater.format(Instant.now()));
277277
}
278278

279+
public static boolean dropIcebergTable(String namespace, String tableName, Catalog icebergCatalog) {
280+
try{
281+
TableIdentifier tableID = TableIdentifier.of(namespace, tableName);
282+
// Check if table exists
283+
if (!icebergCatalog.tableExists(tableID)) {
284+
LOGGER.warn("Table not found: {}", tableID.toString());
285+
return false;
286+
}
287+
return icebergCatalog.dropTable(tableID, false);
288+
} catch(Exception e){
289+
LOGGER.error("Failed to drop table {}.{}: {}", namespace, tableName, e.getMessage());
290+
throw new RuntimeException("Failed to drop table: " + namespace + "." + tableName, e);
291+
}
292+
}
293+
279294
}

destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/rpc/OlakeRowsIngester.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
6262
throw new Exception("Destination table name not present in metadata");
6363
}
6464

65-
if (this.icebergTable == null) {
65+
if (this.icebergTable == null && request.getType() != IcebergPayload.PayloadType.DROP_TABLE) {
6666
SchemaConvertor schemaConvertor = new SchemaConvertor(identifierField, schemaMetadata);
6767
this.icebergTable = loadIcebergTable(TableIdentifier.of(icebergNamespace, destTableName),
6868
schemaConvertor.convertToIcebergSchema());
@@ -111,10 +111,24 @@ public void sendRecords(IcebergPayload request, StreamObserver<RecordIngest.Reco
111111
break;
112112

113113
case DROP_TABLE:
114-
LOGGER.warn("{} Table {} not dropped, drop table not implemented", requestId, destTableName);
115-
sendResponse(responseObserver, "Drop table not implemented");
114+
String dropTable = metadata.getDestTableName();
115+
String[] parts = dropTable.split("\\.", 2);
116+
if (parts.length != 2) {
117+
throw new IllegalArgumentException("Invalid destination table name: " + dropTable);
118+
}
119+
String namespace = parts[0], tableName = parts[1];
120+
121+
LOGGER.warn("{} Dropping table {}.{}", requestId, namespace, tableName);
122+
123+
boolean dropped = IcebergUtil.dropIcebergTable(namespace, tableName, icebergCatalog);
124+
if (dropped) {
125+
sendResponse(responseObserver, "Successfully dropped table " + tableName);
126+
LOGGER.info("{} Table {} dropped", requestId, tableName);
127+
} else {
128+
sendResponse(responseObserver, "Table " + tableName + " does not exist");
129+
LOGGER.warn("{} Table {} not dropped, table does not exist", requestId, tableName);
130+
}
116131
break;
117-
118132
default:
119133
throw new IllegalArgumentException("Unknown payload type: " + request.getType());
120134
}

destination/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,6 @@ type Writer interface {
3232
// Need to pass olakeTimestamp as end argument to get the correct partition path based on record ingestion time.
3333
EvolveSchema(ctx context.Context, globalSchema, recordsSchema any) (any, error)
3434
// DropStreams is used to clear the destination before re-writing the stream
35-
DropStreams(ctx context.Context, selectedStream []string) error
35+
DropStreams(ctx context.Context, dropStreams []types.StreamInterface) error
3636
Close(ctx context.Context) error
3737
}

destination/parquet/parquet.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -456,57 +456,61 @@ func (p *Parquet) getPartitionedFilePath(values map[string]any, olakeTimestamp t
456456
return filepath.Join(p.basePath, strings.TrimSuffix(result, "/"))
457457
}
458458

459-
func (p *Parquet) DropStreams(ctx context.Context, selectedStreams []string) error {
459+
func (p *Parquet) DropStreams(ctx context.Context, selectedStreams []types.StreamInterface) error {
460+
// check for s3 writer configuration
461+
err := p.initS3Writer()
462+
if err != nil {
463+
return err
464+
}
465+
460466
if len(selectedStreams) == 0 {
461-
logger.Infof("Thread[%s]: no streams selected for clearing, skipping clear operation", p.options.ThreadID)
467+
logger.Infof("no streams selected for clearing, skipping clear operation")
462468
return nil
463469
}
464470

465-
logger.Infof("Thread[%s]: clearing destination for %d selected streams: %v", p.options.ThreadID, len(selectedStreams), selectedStreams)
471+
paths := make([]string, 0, len(selectedStreams))
472+
for _, stream := range selectedStreams {
473+
paths = append(paths, stream.GetDestinationDatabase(nil)+"."+stream.GetDestinationTable())
474+
}
466475

467476
if p.s3Client == nil {
468-
if err := p.clearLocalFiles(selectedStreams); err != nil {
477+
if err := p.clearLocalFiles(paths); err != nil {
469478
return fmt.Errorf("failed to clear local files: %s", err)
470479
}
471480
} else {
472-
if err := p.clearS3Files(ctx, selectedStreams); err != nil {
481+
if err := p.clearS3Files(ctx, paths); err != nil {
473482
return fmt.Errorf("failed to clear S3 files: %s", err)
474483
}
475484
}
476-
477-
logger.Infof("Thread[%s]: successfully cleared destination for selected streams", p.options.ThreadID)
478485
return nil
479486
}
480487

481-
func (p *Parquet) clearLocalFiles(selectedStreams []string) error {
482-
for _, streamID := range selectedStreams {
488+
func (p *Parquet) clearLocalFiles(paths []string) error {
489+
for _, streamID := range paths {
483490
parts := strings.SplitN(streamID, ".", 2)
484491
if len(parts) != 2 {
485-
logger.Warnf("Thread[%s]: invalid stream ID format: %s, skipping", p.options.ThreadID, streamID)
492+
logger.Warnf("invalid stream ID format: %s, skipping", streamID)
486493
continue
487494
}
495+
namespace, tableName := parts[0], parts[1]
496+
streamPath := filepath.Join(p.config.Path, namespace, tableName)
488497

489-
namespace, streamName := parts[0], parts[1]
490-
streamPath := filepath.Join(p.config.Path, namespace, streamName)
491-
492-
logger.Infof("Thread[%s]: clearing local path: %s", p.options.ThreadID, streamPath)
498+
logger.Infof("clearing local path: %s", streamPath)
493499

494500
if _, err := os.Stat(streamPath); os.IsNotExist(err) {
495-
logger.Debugf("Thread[%s]: local path does not exist, skipping: %s", p.options.ThreadID, streamPath)
501+
logger.Debugf("local path does not exist, skipping: %s", streamPath)
496502
continue
497503
}
498504

499505
if err := os.RemoveAll(streamPath); err != nil {
500506
return fmt.Errorf("failed to remove local path %s: %s", streamPath, err)
501507
}
502-
503-
logger.Debugf("Thread[%s]: successfully cleared local path: %s", p.options.ThreadID, streamPath)
504508
}
505509

506510
return nil
507511
}
508512

509-
func (p *Parquet) clearS3Files(ctx context.Context, selectedStreams []string) error {
513+
func (p *Parquet) clearS3Files(ctx context.Context, paths []string) error {
510514
deleteS3PrefixStandard := func(filtPath string) error {
511515
iter := s3manager.NewDeleteListIterator(p.s3Client, &s3.ListObjectsInput{
512516
Bucket: aws.String(p.config.Bucket),
@@ -519,21 +523,21 @@ func (p *Parquet) clearS3Files(ctx context.Context, selectedStreams []string) er
519523
return nil
520524
}
521525

522-
for _, streamID := range selectedStreams {
526+
for _, streamID := range paths {
523527
parts := strings.SplitN(streamID, ".", 2)
524528
if len(parts) != 2 {
525-
logger.Warnf("Thread[%s]: invalid stream ID format: %s, skipping", p.options.ThreadID, streamID)
529+
logger.Warnf("invalid stream ID format: %s, skipping", streamID)
526530
continue
527531
}
532+
namespace, tableName := parts[0], parts[1]
533+
s3TablePath := filepath.Join(p.config.Prefix, namespace, tableName, "/")
528534

529-
namespace, streamName := parts[0], parts[1]
530-
s3TablePath := filepath.Join(p.config.Prefix, namespace, streamName, "/")
531-
logger.Debugf("Thread[%s]: clearing S3 prefix: s3://%s/%s", p.options.ThreadID, p.config.Bucket, s3TablePath)
535+
logger.Debugf("clearing S3 prefix: s3://%s/%s", p.config.Bucket, s3TablePath)
532536
if err := deleteS3PrefixStandard(s3TablePath); err != nil {
533537
return fmt.Errorf("failed to clear S3 prefix %s: %s", s3TablePath, err)
534538
}
535539

536-
logger.Debugf("Thread[%s]: successfully cleared S3 prefix: s3://%s/%s", p.options.ThreadID, p.config.Bucket, s3TablePath)
540+
logger.Debugf("successfully cleared S3 prefix: s3://%s/%s", p.config.Bucket, s3TablePath)
537541
}
538542
return nil
539543
}

destination/writers.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func WithThreadID(threadID string) ThreadOptions {
8383
}
8484
}
8585

86-
func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams, dropStreams []string, batchSize int64) (*WriterPool, error) {
86+
func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams []string, batchSize int64) (*WriterPool, error) {
8787
newfunc, found := RegisteredWriters[config.Type]
8888
if !found {
8989
return nil, fmt.Errorf("invalid destination type has been passed [%s]", config.Type)
@@ -99,12 +99,6 @@ func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams,
9999
return nil, fmt.Errorf("failed to test destination: %s", err)
100100
}
101101

102-
if dropStreams != nil {
103-
if err := adapter.DropStreams(ctx, dropStreams); err != nil {
104-
return nil, fmt.Errorf("failed to clear destination: %s", err)
105-
}
106-
}
107-
108102
pool := &WriterPool{
109103
stats: &Stats{
110104
TotalRecordsToSync: atomic.Int64{},
@@ -279,3 +273,22 @@ func (wt *WriterThread) Close(ctx context.Context) error {
279273
return wt.writer.Close(ctx)
280274
}
281275
}
276+
277+
func ClearDestination(ctx context.Context, config *types.WriterConfig, dropStreams []types.StreamInterface) error {
278+
newfunc, found := RegisteredWriters[config.Type]
279+
if !found {
280+
return fmt.Errorf("invalid destination type has been passed [%s]", config.Type)
281+
}
282+
283+
adapter := newfunc()
284+
if err := utils.Unmarshal(config.WriterConfig, adapter.GetConfigRef()); err != nil {
285+
return err
286+
}
287+
288+
if dropStreams != nil {
289+
if err := adapter.DropStreams(ctx, dropStreams); err != nil {
290+
return fmt.Errorf("failed to drop the streams: %s", err)
291+
}
292+
}
293+
return nil
294+
}

0 commit comments

Comments
 (0)