Skip to content

Commit 82adce4

Browse files
feat: removal of heavy produce schema operation during sync for kafka and mongo
1 parent a65b47e commit 82adce4

5 files changed

Lines changed: 78 additions & 51 deletions

File tree

constants/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,7 @@ var SkipCDCDrivers = []DriverType{Oracle, DB2}
5959

6060
// DriversRequiringIncrementalFormatter are drivers that require special formatting for incremental value
6161
var DriversRequiringIncrementalFormatter = []DriverType{Oracle, DB2, MSSQL}
62+
63+
// SyncContext is used as a context key to signal that ProduceSchema is being called during
64+
// sync. Drivers that perform expensive sampling (e.g. MongoDB, Kafka) can check for it to skip schema inference.
65+
type SyncContext struct{}

drivers/kafka/internal/kafka.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,15 @@ func (k *Kafka) ProduceSchema(ctx context.Context, streamName string) (*types.St
153153
stream := types.NewStream(streamName, "topics", nil)
154154
stream.WithSyncMode(types.STRICTCDC)
155155
stream.SyncMode = types.STRICTCDC
156+
stream.SourceDefinedPrimaryKey = types.NewSet(Offset, Partition)
157+
158+
// During sync, skip expensive message reading for schema inference.
159+
// The full schema is already available in the catalog (streams.json) from a prior discover run.
160+
// Only structural metadata (sync modes, primary keys) is needed for stream validation.
161+
if ctx.Value(constants.SyncContext{}) != nil {
162+
logger.Infof("sync context detected, skipping schema inference for topic [%s]", streamName)
163+
return stream, nil
164+
}
156165

157166
// create reader manager for schema discovery
158167
readerManager := kafkapkg.NewReaderManager(kafkapkg.ReaderConfig{
@@ -230,7 +239,6 @@ func (k *Kafka) ProduceSchema(ctx context.Context, streamName string) (*types.St
230239
return nil, fmt.Errorf("failed to fetch schema for topic %s: %s", streamName, err)
231240
}
232241

233-
stream.SourceDefinedPrimaryKey = types.NewSet(Offset, Partition)
234242
return stream, nil
235243
}
236244

drivers/mongodb/internal/mon.go

Lines changed: 53 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -230,69 +230,76 @@ func (m *Mongo) GetStreamNames(ctx context.Context) ([]string, error) {
230230
return streamNames, collections.Err()
231231
}
232232

233-
// TODO: omit usage of ProduceSchema when sync command is used
234233
func (m *Mongo) ProduceSchema(ctx context.Context, streamName string) (*types.Stream, error) {
235-
produceCollectionSchema := func(ctx context.Context, db *mongo.Database, streamName string) (*types.Stream, error) {
236-
logger.Infof("producing type schema for stream [%s]", streamName)
237-
238-
// initialize stream
239-
collection := db.Collection(streamName)
240-
stream := types.NewStream(streamName, db.Name(), nil)
241-
// find primary keys
242-
indexesCursor, err := collection.Indexes().List(ctx, options.ListIndexes())
243-
if err != nil {
234+
database := m.client.Database(m.config.Database)
235+
collection := database.Collection(streamName)
236+
stream := types.NewStream(streamName, database.Name(), nil)
237+
238+
// Query indexes for primary keys (lightweight, always needed for validation)
239+
indexesCursor, err := collection.Indexes().List(ctx, options.ListIndexes())
240+
if err != nil {
241+
return nil, err
242+
}
243+
defer indexesCursor.Close(ctx)
244+
245+
for indexesCursor.Next(ctx) {
246+
var indexes bson.M
247+
if err := indexesCursor.Decode(&indexes); err != nil {
244248
return nil, err
245249
}
246-
defer indexesCursor.Close(ctx)
250+
for key := range indexes["key"].(bson.M) {
251+
stream.WithPrimaryKey(key)
252+
}
253+
}
247254

248-
for indexesCursor.Next(ctx) {
249-
var indexes bson.M
250-
if err := indexesCursor.Decode(&indexes); err != nil {
251-
return nil, err
252-
}
253-
for key := range indexes["key"].(bson.M) {
254-
stream.WithPrimaryKey(key)
255-
}
255+
// During sync, skip expensive document sampling for schema inference.
256+
// The full schema is already available in the catalog (streams.json) from a prior discover run.
257+
// Only structural metadata (primary keys, sync modes) is needed for stream validation.
258+
if ctx.Value(constants.SyncContext{}) != nil {
259+
logger.Infof("sync context detected, skipping schema inference for stream [%s]", streamName)
260+
stream.WithSyncMode(types.FULLREFRESH, types.INCREMENTAL)
261+
if m.CDCSupported() {
262+
stream.UpsertField(CDCResumeToken, types.String, true, true)
263+
stream.WithSyncMode(types.CDC, types.STRICTCDC)
256264
}
265+
return stream, nil
266+
}
267+
268+
logger.Infof("producing type schema for stream [%s]", streamName)
269+
270+
// Define find options for fetching documents in ascending and descending order.
271+
findOpts := []*options.FindOptions{
272+
options.Find().SetLimit(10000).SetSort(bson.D{{Key: "$natural", Value: 1}}),
273+
options.Find().SetLimit(10000).SetSort(bson.D{{Key: "$natural", Value: -1}}),
274+
}
257275

258-
// Define find options for fetching documents in ascending and descending order.
259-
findOpts := []*options.FindOptions{
260-
options.Find().SetLimit(10000).SetSort(bson.D{{Key: "$natural", Value: 1}}),
261-
options.Find().SetLimit(10000).SetSort(bson.D{{Key: "$natural", Value: -1}}),
276+
if err := utils.Concurrent(ctx, findOpts, len(findOpts), func(ctx context.Context, findOpt *options.FindOptions, execNumber int) error {
277+
cursor, err := collection.Find(ctx, bson.D{}, findOpt)
278+
if err != nil {
279+
return err
262280
}
281+
defer cursor.Close(ctx)
263282

264-
return stream, utils.Concurrent(ctx, findOpts, len(findOpts), func(ctx context.Context, findOpt *options.FindOptions, execNumber int) error {
265-
cursor, err := collection.Find(ctx, bson.D{}, findOpt)
266-
if err != nil {
283+
for cursor.Next(ctx) {
284+
var row bson.M
285+
if err := cursor.Decode(&row); err != nil {
267286
return err
268287
}
269-
defer cursor.Close(ctx)
270288

271-
for cursor.Next(ctx) {
272-
var row bson.M
273-
if err := cursor.Decode(&row); err != nil {
274-
return err
275-
}
276-
277-
filterMongoObject(row)
278-
if err := typeutils.Resolve(stream, row); err != nil {
279-
return err
280-
}
289+
filterMongoObject(row)
290+
if err := typeutils.Resolve(stream, row); err != nil {
291+
return err
281292
}
293+
}
282294

283-
return cursor.Err()
284-
})
285-
}
286-
database := m.client.Database(m.config.Database)
287-
// Either wait for covering 100k records from both sides for all streams
288-
// Or wait till discoverCtx exits
289-
stream, err := produceCollectionSchema(ctx, database, streamName)
290-
if err != nil {
295+
return cursor.Err()
296+
}); err != nil {
291297
if ctx.Err() != nil {
292298
return nil, fmt.Errorf("failed to produce schema context deadline exceeded: %s", ctx.Err())
293299
}
294300
return nil, fmt.Errorf("failed to process collection[%s]: %s", streamName, err)
295301
}
302+
296303
// Add all discovered fields as potential cursor fields
297304
stream.Schema.Properties.Range(func(key, value interface{}) bool {
298305
if fieldName, ok := key.(string); ok {
@@ -307,7 +314,7 @@ func (m *Mongo) ProduceSchema(ctx context.Context, streamName string) (*types.St
307314
stream.WithSyncMode(types.CDC, types.STRICTCDC)
308315
}
309316

310-
return stream, err
317+
return stream, nil
311318
}
312319

313320
func filterMongoObject(doc bson.M) {

protocol/sync.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package protocol
22

33
import (
4+
"context"
45
"fmt"
56
"strings"
67
"sync"
@@ -82,13 +83,18 @@ var syncCmd = &cobra.Command{
8283
return nil
8384
},
8485
RunE: func(cmd *cobra.Command, _ []string) error {
86+
// Mark context as sync command so drivers can skip expensive schema inference
87+
// (e.g., MongoDB document sampling, Kafka message reading) since the schema
88+
// is already available in the catalog (streams.json) from a prior discover run.
89+
ctx := context.WithValue(cmd.Context(), constants.SyncContext{}, true)
90+
8591
// setup conector first
86-
err := connector.Setup(cmd.Context())
92+
err := connector.Setup(ctx)
8793
if err != nil {
8894
return err
8995
}
9096
// Get Source Streams, sending 0 max discover threads to discover
91-
streams, err := connector.Discover(cmd.Context(), 0)
97+
streams, err := connector.Discover(ctx, 0)
9298
if err != nil {
9399
return err
94100
}

types/stream_configured.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,10 @@ func (s *ConfiguredStream) Validate(source *Stream) error {
261261
return fmt.Errorf("invalid sync mode[%s]; valid are %v", s.Stream.SyncMode, source.SupportedSyncModes)
262262
}
263263

264-
// no cursor validation in cdc and backfill sync
265-
if s.Stream.SyncMode == INCREMENTAL {
264+
// no cursor validation in cdc and backfill sync;
265+
// also skip when AvailableCursorFields is empty — the source produced a
266+
// lightweight schema (e.g. sync fast-path) and we trust the streams.json.
267+
if s.Stream.SyncMode == INCREMENTAL && len(source.AvailableCursorFields.Array()) > 0 {
266268
primaryCursor, secondaryCursor := s.Cursor()
267269
if !source.AvailableCursorFields.Exists(primaryCursor) {
268270
return fmt.Errorf("invalid cursor field [%s]; valid are %v", primaryCursor, source.AvailableCursorFields)

0 commit comments

Comments
 (0)