Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocol/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func compareStreams() error {
return fmt.Errorf("failed to read new catalog: %s", derr)
}

diffCatalog := types.GetStreamsDelta(&oldStreams, &newStreams, connector.Type())
diffCatalog := types.GetStreamsDelta(&oldStreams, &newStreams)
if err := logger.FileLoggerWithPath(diffCatalog, viper.GetString(constants.DifferencePath)); err != nil {
return fmt.Errorf("failed to write difference streams: %s", err)
}
Expand Down
32 changes: 17 additions & 15 deletions types/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ func getDestDBPrefix(streams []*ConfiguredStream) (constantValue bool, prefix st
// Only selected streams are compared.
// 1. Compares properties from selected_streams: normalization, partition_regex, filter, append_mode
// 2. Compares properties from streams: destination_database, cursor_field, sync_mode
// 3. For new streams: Only adds them if connector is Postgres/MySQL AND sync_mode is CDC
// 3. For now, any new stream present in new catalog is added to the difference. Later collision detection will happen.
//
// Parameters:
// - oldStreams: The previous catalog to compare against
// - newStreams: The current catalog with potential changes
//
// Returns:
// - A catalog containing only the streams that have differences
func GetStreamsDelta(oldStreams, newStreams *Catalog, connectorType string) *Catalog {
func GetStreamsDelta(oldStreams, newStreams *Catalog) *Catalog {
diffStreams := &Catalog{
Streams: []*ConfiguredStream{},
SelectedStreams: make(map[string][]StreamMetadata),
Expand All @@ -206,10 +206,6 @@ func GetStreamsDelta(oldStreams, newStreams *Catalog, connectorType string) *Cat
}
}

// flag for connector which have global state support
// TODO: create an array of global state supported connectors in constants
globalStateSupportedConnector := connectorType == string(constants.Postgres) || connectorType == string(constants.MySQL)

for namespace, newMetadatas := range newStreams.SelectedStreams {
for _, newMetadata := range newMetadatas {
streamID := fmt.Sprintf("%s.%s", namespace, newMetadata.StreamName)
Expand All @@ -227,14 +223,11 @@ func GetStreamsDelta(oldStreams, newStreams *Catalog, connectorType string) *Cat
// if new stream in selected_streams
if !oldMetadataExists || !oldStreamExists {
// addition of new streams
if globalStateSupportedConnector && newStream.GetStream().SyncMode == CDC {
diffStreams.Streams = append(diffStreams.Streams, newStream)
diffStreams.SelectedStreams[namespace] = append(
diffStreams.SelectedStreams[namespace],
newMetadata,
)
}
// skip new selected streams for mongo and sync mode != cdc
diffStreams.Streams = append(diffStreams.Streams, newStream)
diffStreams.SelectedStreams[namespace] = append(
diffStreams.SelectedStreams[namespace],
newMetadata,
)
continue
}

Expand All @@ -259,7 +252,16 @@ func GetStreamsDelta(oldStreams, newStreams *Catalog, connectorType string) *Cat

// if any difference, add stream to diff streams
if isDifferent {
diffStreams.Streams = append(diffStreams.Streams, newStream)
// copy of the new stream to modify it for the difference
newStreamCopy := *newStream.Stream
deltaStream := &ConfiguredStream{
Stream: &newStreamCopy,
}

// safely change for destination database if difference present
deltaStream.Stream.DestinationDatabase = utils.Ternary(oldStream.Stream.DestinationDatabase != newStream.Stream.DestinationDatabase, oldStream.Stream.DestinationDatabase, newStream.Stream.DestinationDatabase).(string)

diffStreams.Streams = append(diffStreams.Streams, deltaStream)
diffStreams.SelectedStreams[namespace] = append(
diffStreams.SelectedStreams[namespace],
newMetadata,
Expand Down
Loading