Skip to content

Commit 67c5796

Browse files
ImDoubD-datazipvishalm0509hash-data
authored
chore: staging -> master v0.1.10 (#455)
Co-authored-by: Vishal-datazip <vishal@datazip.io> Co-authored-by: Ankit Sharma <111491139+hash-data@users.noreply.github.com> Co-authored-by: hash-data <ankit@datazip.io>
1 parent 0864a50 commit 67c5796

19 files changed

Lines changed: 724 additions & 163 deletions

File tree

.github/workflows/integration-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ jobs:
8787
run: mvn clean package -DskipTests
8888

8989
- name: Run Integration Tests
90-
run: go test -v -p 2 ./drivers/mysql/internal/... ./drivers/postgres/internal/... -tags=integration --timeout 0
90+
run: go test -v -p 2 ./drivers/mysql/internal/... ./drivers/postgres/internal/... -timeout 0 -run 'Integration'
9191

9292
- name: Cleanup
9393
if: always()
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
name: Performance Tests
2+
3+
on:
4+
push:
5+
branches:
6+
- "staging"
7+
paths:
8+
- '**/*.go'
9+
- '**/*.java'
10+
11+
permissions:
12+
id-token: write
13+
contents: read
14+
15+
jobs:
16+
performance-tests:
17+
environment: Performance Testing
18+
runs-on: ubuntu-latest
19+
strategy:
20+
matrix:
21+
include:
22+
- driver: mysql
23+
driver_upper: MYSQL
24+
- driver: postgres
25+
driver_upper: POSTGRES
26+
# TODO: add benchmark tests for the below databases
27+
# - driver: mongodb
28+
# driver_upper: MONGODB
29+
# - driver: oracle
30+
# driver_upper: ORACLE
31+
fail-fast: false
32+
33+
steps:
34+
- name: Checkout code
35+
uses: actions/checkout@v3
36+
37+
- name: Set up Go
38+
uses: actions/setup-go@v4
39+
with:
40+
go-version: '1.23.2'
41+
42+
- name: Set up Java for Maven
43+
uses: actions/setup-java@v3
44+
with:
45+
distribution: 'temurin'
46+
java-version: '17'
47+
48+
- name: Setup VPN Client
49+
if: matrix.driver != 'oracle'
50+
run: |
51+
sudo apt-get update -qq && sudo apt-get install -y openvpn iproute2 iputils-ping netcat-openbsd telnet
52+
sudo mkdir -p /etc/openvpn/client && sudo chmod 700 /etc/openvpn/client
53+
echo "${{ secrets[format('{0}_OPENVPN_CONFIG', matrix.driver_upper)] }}" | base64 --decode | sudo tee /etc/openvpn/client/client.ovpn > /dev/null
54+
echo "${{ secrets.OPENVPN_USERNAME }}" | sudo tee /etc/openvpn/client/auth.txt > /dev/null
55+
sudo chmod 600 /etc/openvpn/client/client.ovpn /etc/openvpn/client/auth.txt
56+
sudo chown root:root /etc/openvpn/client/client.ovpn /etc/openvpn/client/auth.txt
57+
sudo openvpn --config /etc/openvpn/client/client.ovpn --daemon ovpn-client --log /var/log/openvpn-client.log --verb 3
58+
echo "Establishing VPN connection..."
59+
for i in {1..30}; do
60+
if ip addr show | grep -q "tun0\|tap0"; then echo "✅ VPN connected"; break; fi
61+
[ $i -eq 30 ] && { echo "❌ VPN timeout"; sudo cat /var/log/openvpn-client.log; exit 1; }
62+
sleep 2
63+
done
64+
sudo resolvectl dns tun0 ${{ secrets.VPN_DNS_SERVER }}
65+
sudo resolvectl domain tun0 ~private.postgres.database.azure.com
66+
67+
- name: Configure AWS Credentials
68+
uses: aws-actions/configure-aws-credentials@v4
69+
with:
70+
role-session-name: performance_test_gh
71+
role-to-assume: ${{ secrets.AWS_GITHUB_ROLE }}
72+
aws-region: ${{ secrets.AWS_REGION }}
73+
74+
- name: Install Go Dependencies
75+
run: go mod download
76+
77+
- name: Build Project
78+
run: go build -v ./...
79+
80+
- name: Build Iceberg Sink
81+
working-directory: ./destination/iceberg/olake-iceberg-java-writer
82+
run: mvn clean package -DskipTests
83+
84+
- name: Create source
85+
run: |
86+
echo '${{ secrets[format('{0}_SOURCE_JSON', matrix.driver_upper)] }}' | base64 --decode > ./drivers/${{ matrix.driver }}/internal/testdata/source.json
87+
88+
- name: Create destination
89+
run: |
90+
echo '${{ secrets[format('{0}_DESTINATION_JSON', matrix.driver_upper)] }}' | base64 --decode > ./drivers/${{ matrix.driver }}/internal/testdata/destination.json
91+
92+
- name: Run Performance Tests
93+
run: go test -v ./drivers/${{ matrix.driver }}/internal/... -timeout 0 -run 'Performance'
94+
95+
- name: Cleanup
96+
if: always()
97+
run: |
98+
aws glue delete-database --name performance_${{ matrix.driver }} || { echo "failed to delete glue database: performance_${{ matrix.driver }}"; aws glue get-database --name performance_${{ matrix.driver }} || true; }
99+
aws s3 rm s3://dz-stag-github-actions/performance_${{ matrix.driver }} --recursive || { echo "failed to delete s3 bucket: performance_${{ matrix.driver }}"; aws s3 ls s3://dz-stag-github-actions/performance_${{ matrix.driver }} || true; }
100+
echo "Catalog cleanup completed"
101+
102+
if [[ "${{ matrix.driver }}" != "oracle" ]]; then
103+
sudo pkill -f "openvpn.*client.ovpn" || true
104+
sudo rm -rf /etc/openvpn/client/ /var/log/openvpn-client.log || true
105+
echo "VPN cleanup completed"
106+
fi

destination/iceberg/iceberg_utils.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@ func determineMaxBatchSize() int64 {
3030
var batchSize int64
3131

3232
switch {
33-
case ramGB <= 8:
34-
batchSize = 100 * 1024 * 1024 // 100MB
3533
case ramGB <= 16:
36-
batchSize = 200 * 1024 * 1024 // 200MB
34+
batchSize = 100 * 1024 * 1024 // 100MB
3735
case ramGB <= 32:
3836
batchSize = 400 * 1024 * 1024 // 400MB
3937
default:

destination/iceberg/olake-iceberg-java-writer/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,17 @@ private void applyFieldAddition(Table icebergTable, Schema newSchema) {
160160
// @NOTE avoid committing when there is no schema change. commit creates new
161161
// commit even when there is no change!
162162
if (!icebergTable.schema().sameSchema(newSchemaCombined)) {
163-
LOGGER.warn("Extending schema of {}", icebergTable.name());
164-
us.commit();
163+
synchronized (commitLock) {
164+
icebergTable.refresh();
165+
UpdateSchema usFinal = icebergTable.updateSchema().unionByNameWith(newSchema);
166+
if (createIdentifierFields) {
167+
usFinal.setIdentifierFields(newSchema.identifierFieldNames());
168+
}
169+
if (!icebergTable.schema().sameSchema(newSchemaCombined)) {
170+
LOGGER.warn("Extending schema of {}", icebergTable.name());
171+
usFinal.commit();
172+
}
173+
}
165174
}
166175
}
167176

drivers/abstract/abstract.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,18 @@ func (a *AbstractDriver) Discover(ctx context.Context) ([]*types.Stream, error)
9292
convStream.WithSyncMode(types.FULLREFRESH, types.INCREMENTAL)
9393
convStream.SyncMode = types.FULLREFRESH
9494

95+
// add default columns
96+
for column, typ := range DefaultColumns {
97+
convStream.UpsertField(column, typ, true)
98+
}
99+
95100
// Add CDC columns if supported
96101
if a.driver.CDCSupported() {
97-
for column, typ := range DefaultColumns {
98-
convStream.UpsertField(column, typ, true)
99-
}
100102
convStream.WithSyncMode(types.CDC, types.STRICTCDC)
101103
convStream.SyncMode = types.CDC
104+
} else {
105+
// remove cdc column as it is not supported
106+
convStream.Schema.Properties.Delete(constants.CdcTimestamp)
102107
}
103108
finalStreams = append(finalStreams, convStream)
104109
return true

drivers/mongodb/go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,4 @@ require (
8181
golang.org/x/text v0.24.0 // indirect
8282
golang.org/x/tools v0.30.0 // indirect
8383
sigs.k8s.io/yaml v1.4.0 // indirect
84-
)
85-
86-
replace github.com/datazip-inc/olake => ../../
84+
)

drivers/mongodb/internal/backfill.go

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,6 @@ func (m *Mongo) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo
7171
logger.Infof("Total expected count for stream %s: %d", stream.ID(), recordCount)
7272
pool.AddRecordsToSync(recordCount)
7373

74-
// build filter
75-
filter, err := buildFilter(stream)
76-
if err != nil {
77-
return nil, fmt.Errorf("failed to parse filter during chunk splitting: %s", err)
78-
}
79-
8074
// check for _id type
8175
isObjID, err := isObjectID(ctx, collection)
8276
if err != nil {
@@ -88,7 +82,7 @@ func (m *Mongo) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo
8882
var retryErr error
8983
var chunksArray []types.Chunk
9084
err = abstract.RetryOnBackoff(m.config.RetryCount, 1*time.Minute, func() error {
91-
chunksArray, retryErr = m.splitChunks(ctx, collection, stream, filter, isObjID, storageSize)
85+
chunksArray, retryErr = m.splitChunks(ctx, collection, stream, isObjID, storageSize)
9286
return retryErr
9387
})
9488
if err != nil {
@@ -97,11 +91,11 @@ func (m *Mongo) GetOrSplitChunks(ctx context.Context, pool *destination.WriterPo
9791
return types.NewSet(chunksArray...), nil
9892
}
9993

100-
func (m *Mongo) splitChunks(ctx context.Context, collection *mongo.Collection, stream types.StreamInterface, filter bson.D, isObjID bool, storageSize float64) ([]types.Chunk, error) {
94+
func (m *Mongo) splitChunks(ctx context.Context, collection *mongo.Collection, stream types.StreamInterface, isObjID bool, storageSize float64) ([]types.Chunk, error) {
10195
splitVectorStrategy := func() ([]types.Chunk, error) {
10296
getID := func(order int) (primitive.ObjectID, error) {
10397
var doc bson.M
104-
err := collection.FindOne(ctx, filter, options.FindOne().SetSort(bson.D{{Key: "_id", Value: order}})).Decode(&doc)
98+
err := collection.FindOne(ctx, bson.D{}, options.FindOne().SetSort(bson.D{{Key: "_id", Value: order}})).Decode(&doc)
10599
if err == mongo.ErrNoDocuments {
106100
return primitive.NilObjectID, nil
107101
}
@@ -131,9 +125,6 @@ func (m *Mongo) splitChunks(ctx context.Context, collection *mongo.Collection, s
131125
{Key: "maxChunkSize", Value: 1024},
132126
}
133127

134-
if len(filter) > 0 {
135-
cmd = append(cmd, bson.E{Key: "filter", Value: filter})
136-
}
137128
if err := collection.Database().RunCommand(ctx, cmd).Decode(&result); err != nil {
138129
return nil, fmt.Errorf("failed to run splitVector command: %s", err)
139130
}
@@ -169,19 +160,14 @@ func (m *Mongo) splitChunks(ctx context.Context, collection *mongo.Collection, s
169160
bucketAutoStrategy := func(storageSize float64) ([]types.Chunk, error) {
170161
logger.Infof("using bucket auto strategy for stream: %s", stream.ID())
171162
// Use $bucketAuto for chunking
172-
pipeline := mongo.Pipeline{}
173-
if len(filter) > 0 {
174-
pipeline = append(pipeline, bson.D{{Key: "$match", Value: filter}})
175-
}
176-
177163
numberOfBuckets := int(math.Ceil(storageSize / float64(constants.EffectiveParquetSize)))
178-
pipeline = append(pipeline,
179-
bson.D{{Key: "$sort", Value: bson.D{{Key: "_id", Value: 1}}}},
180-
bson.D{{Key: "$bucketAuto", Value: bson.D{
164+
pipeline := mongo.Pipeline{
165+
{{Key: "$sort", Value: bson.D{{Key: "_id", Value: 1}}}},
166+
{{Key: "$bucketAuto", Value: bson.D{
181167
{Key: "groupBy", Value: "$_id"},
182168
{Key: "buckets", Value: numberOfBuckets},
183169
}}},
184-
)
170+
}
185171

186172
cursor, err := collection.Aggregate(ctx, pipeline)
187173
if err != nil {
@@ -227,7 +213,7 @@ func (m *Mongo) splitChunks(ctx context.Context, collection *mongo.Collection, s
227213

228214
timestampStrategy := func() ([]types.Chunk, error) {
229215
// Time-based strategy implementation
230-
first, last, err := m.fetchExtremes(ctx, collection, filter)
216+
first, last, err := m.fetchExtremes(ctx, collection)
231217
if err != nil {
232218
return nil, err
233219
}
@@ -307,12 +293,12 @@ func (m *Mongo) totalCountAndStorageSizeInCollection(ctx context.Context, collec
307293
return count, storageSize, nil
308294
}
309295

310-
func (m *Mongo) fetchExtremes(ctx context.Context, collection *mongo.Collection, filter bson.D) (time.Time, time.Time, error) {
296+
func (m *Mongo) fetchExtremes(ctx context.Context, collection *mongo.Collection) (time.Time, time.Time, error) {
311297
extreme := func(sortby int) (time.Time, error) {
312298
// Find the first document
313299
var result bson.M
314300
// Sort by _id ascending to get the first document
315-
err := collection.FindOne(ctx, filter, options.FindOne().SetSort(bson.D{{Key: "_id", Value: sortby}})).Decode(&result)
301+
err := collection.FindOne(ctx, bson.D{}, options.FindOne().SetSort(bson.D{{Key: "_id", Value: sortby}})).Decode(&result)
316302
if err != nil {
317303
return time.Time{}, err
318304
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package driver
2+
3+
import (
4+
"testing"
5+
6+
"github.com/datazip-inc/olake/constants"
7+
"github.com/datazip-inc/olake/utils/testutils"
8+
)
9+
10+
func TestMongodbPerformance(t *testing.T) {
11+
config := &testutils.PerformanceTest{
12+
TestConfig: testutils.GetTestConfig(string(constants.MongoDB)),
13+
Namespace: "twitter_data",
14+
BackfillStreams: []string{"tweets"},
15+
CDCStreams: []string{"tweets_cdc"},
16+
ExecuteQuery: ExecuteQuery,
17+
}
18+
19+
config.TestPerformance(t)
20+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package driver
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"testing"
8+
9+
"github.com/datazip-inc/olake/utils"
10+
"github.com/datazip-inc/olake/utils/testutils"
11+
"github.com/stretchr/testify/require"
12+
"go.mongodb.org/mongo-driver/bson"
13+
"go.mongodb.org/mongo-driver/mongo"
14+
"go.mongodb.org/mongo-driver/mongo/options"
15+
)
16+
17+
func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) {
18+
t.Helper()
19+
20+
var connStr string
21+
var config Config
22+
if fileConfig {
23+
utils.UnmarshalFile("./testdata/source.json", &config, false)
24+
connStr = fmt.Sprintf(
25+
"mongodb://%s:%s@%s/?authSource=%s&readPreference=%s",
26+
config.Username,
27+
config.Password,
28+
strings.Join(config.Hosts, ","),
29+
config.AuthDB,
30+
config.ReadPreference,
31+
)
32+
} else {
33+
connStr = "mongodb://localhost:27017"
34+
}
35+
db, ok := mongo.Connect(ctx, options.Client().ApplyURI(connStr))
36+
require.NoError(t, ok, "failed to connect to mongodb")
37+
38+
switch operation {
39+
case "setup_cdc":
40+
// truncate the cdc tables
41+
for _, cdcStream := range streams {
42+
_, err := db.Database(config.Database).Collection(cdcStream).DeleteMany(ctx, bson.D{})
43+
require.NoError(t, err, fmt.Sprintf("failed to execute %s operation", operation), err)
44+
}
45+
return
46+
47+
case "bulk_cdc_data_insert":
48+
backfillStreams := testutils.GetBackfillStreamsFromCDC(streams)
49+
totalRows := 15000000
50+
51+
// TODO: insert data in batch
52+
// insert the data into the cdc tables concurrently
53+
err := utils.Concurrent(ctx, streams, len(streams), func(ctx context.Context, cdcStream string, executionNumber int) error {
54+
srcColl := db.Database(config.Database).Collection(backfillStreams[executionNumber-1])
55+
destColl := db.Database(config.Database).Collection(cdcStream)
56+
57+
cursor, err := srcColl.Find(ctx, bson.D{}, options.Find().SetLimit(int64(totalRows)))
58+
if err != nil {
59+
return fmt.Errorf("stream: %s, error: %s", cdcStream, err)
60+
}
61+
defer cursor.Close(ctx)
62+
63+
var docs []interface{}
64+
for cursor.Next(ctx) {
65+
var doc bson.M
66+
if err := cursor.Decode(&doc); err != nil {
67+
return err
68+
}
69+
docs = append(docs, doc)
70+
}
71+
if err := cursor.Err(); err != nil {
72+
return err
73+
}
74+
if len(docs) == 0 {
75+
return nil
76+
}
77+
_, err = destColl.InsertMany(ctx, docs)
78+
if err != nil {
79+
return fmt.Errorf("stream: %s, error: %s", cdcStream, err)
80+
}
81+
return nil
82+
})
83+
require.NoError(t, err, fmt.Sprintf("failed to execute %s operation", operation), err)
84+
return
85+
}
86+
}

0 commit comments

Comments
 (0)