Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions internal/impl/mssqlserver/checkpoint_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,16 @@ func (c *checkpointCache) Close(ctx context.Context) error {
}

func createCacheTable(ctx context.Context, db *sql.DB, tbl cacheTable) (bool, error) {
// cache_key length is based on default (fixed) cache key
// LSNs are varbinary(10); storing them as varchar would trigger an implicit
// binary-to-hex-string conversion by the driver and corrupt the value on
// read-back.
q := `
DECLARE @created BIT = 0;
IF NOT EXISTS (SELECT 1 FROM sys.tables WHERE schema_id = SCHEMA_ID('%s') AND name = '%s')
BEGIN
CREATE TABLE %s (
cache_key varchar(7) NOT NULL PRIMARY KEY,
cache_val varchar(100)
cache_val varbinary(10)
);
SET @created = 1;
END;
Expand All @@ -186,7 +188,7 @@ func createUpsertStoredProc(ctx context.Context, db *sql.DB, cacheTable cacheTab
q := `
CREATE OR ALTER PROCEDURE %s
@Key varchar(7),
@Value varchar(100)
@Value varbinary(10)
AS
BEGIN
SET NOCOUNT ON;
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/mssqlserver/checkpoint_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func TestIntegration_MicrosoftSQLServerCDC_CheckpointCache(t *testing.T) {
cache, err := newCheckpointCache(context.Background(), connStr, cacheTableToCreate, nil)
require.NoError(t, err)

// verify set
// verify set: LSN is a 10-byte varbinary value
var wanted replication.LSN
require.NoError(t, wanted.Scan([]byte("0x0000002d000004b00003")))
require.NoError(t, wanted.Scan([]byte{0x00, 0x00, 0x00, 0x2d, 0x00, 0x00, 0x04, 0xb0, 0x00, 0x03}))
require.NoError(t, cache.Set(t.Context(), "", wanted, nil))

// verify get
Expand Down
14 changes: 14 additions & 0 deletions internal/impl/mssqlserver/input_mssqlserver_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"errors"
"fmt"
"regexp"
"sync"
"time"

"github.com/Jeffail/checkpoint"
Expand Down Expand Up @@ -152,6 +153,7 @@ type sqlServerCDCInput struct {
publisher *batchPublisher
metrics *service.Metrics

connMu sync.Mutex
stopSig *shutdown.Signaller
log *service.Logger
cpCache service.Cache
Expand Down Expand Up @@ -283,6 +285,18 @@ func newMSSQLServerCDCInput(conf *service.ParsedConfig, resources *service.Resou
}

func (i *sqlServerCDCInput) Connect(ctx context.Context) error {
i.connMu.Lock()
defer i.connMu.Unlock()

// If the background goroutine from a previous Connect is still running,
// skip reconnection. HasStoppedChan is closed initially (constructor) and
// when the goroutine exits, so a blocking default means "still active".
select {
case <-i.stopSig.HasStoppedChan():
default:
return nil
}

var (
err error
userTables []replication.UserDefinedTable
Expand Down
90 changes: 55 additions & 35 deletions internal/impl/mssqlserver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func TestIntegration_MicrosoftSQLServerCDC_SnapshotAndStreaming(t *testing.T) {
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

// wait for changes to propagate to change tables
time.Sleep(5 * time.Second)
db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar")

var (
outBatches []string
Expand Down Expand Up @@ -108,13 +107,17 @@ microsoft_sql_server_cdc:

t.Log("Verifying streaming changes...")
{
// insert 3000 more for streaming changes
for range 1000 {
// insert streaming changes (reduced count to avoid CDC agent timeout under emulation)
streamingRowsPerTable := 10
streamingWant := streamingRowsPerTable * 3
for range streamingRowsPerTable {
db.MustExec("INSERT INTO test.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar")

outBatchesMu.Lock()
outBatches = nil
outBatchesMu.Unlock()
Expand All @@ -123,10 +126,10 @@ microsoft_sql_server_cdc:
defer outBatchesMu.Unlock()

got := len(outBatches)
if got > want {
t.Fatalf("Wanted %d streaming changes but got %d", want, got)
if got > streamingWant {
t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got)
}
return got == want
return got == streamingWant
}, time.Minute*5, time.Second*1)

}
Expand All @@ -151,8 +154,7 @@ microsoft_sql_server_cdc:
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

// wait for changes to propagate to change tables
time.Sleep(5 * time.Second)
db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar")

var (
outBatches []string
Expand Down Expand Up @@ -210,13 +212,17 @@ microsoft_sql_server_cdc:

t.Log("Verifying streaming changes...")
{
// insert 3000 more for streaming changes
for range 1000 {
// insert streaming changes (reduced count to avoid CDC agent timeout under emulation)
streamingRowsPerTable := 10
streamingWant := streamingRowsPerTable * 3
for range streamingRowsPerTable {
db.MustExec("INSERT INTO test.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar")

outBatchesMu.Lock()
outBatches = nil
outBatchesMu.Unlock()
Expand All @@ -225,10 +231,10 @@ microsoft_sql_server_cdc:
defer outBatchesMu.Unlock()

got := len(outBatches)
if got > want {
t.Fatalf("Wanted %d streaming changes but got %d", want, got)
if got > streamingWant {
t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got)
}
return got == want
return got == streamingWant
}, time.Minute*5, time.Second*1)

}
Expand All @@ -253,8 +259,7 @@ microsoft_sql_server_cdc:
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

// wait for changes to propagate to change tables
time.Sleep(5 * time.Second)
db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar")

var (
outBatches []string
Expand Down Expand Up @@ -317,13 +322,17 @@ file:

t.Log("Verifying streaming changes...")
{
// insert 3000 more for streaming changes
for range 1000 {
// insert streaming changes (reduced count to avoid CDC agent timeout under emulation)
streamingRowsPerTable := 10
streamingWant := streamingRowsPerTable * 3
for range streamingRowsPerTable {
db.MustExec("INSERT INTO test.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar")

outBatchesMu.Lock()
outBatches = nil
outBatchesMu.Unlock()
Expand All @@ -332,10 +341,10 @@ file:
defer outBatchesMu.Unlock()

got := len(outBatches)
if got > want {
t.Fatalf("Wanted %d streaming changes but got %d", want, got)
if got > streamingWant {
t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got)
}
return got == want
return got == streamingWant
}, time.Minute*5, time.Second*1)

}
Expand Down Expand Up @@ -442,6 +451,7 @@ microsoft_sql_server_cdc:
outBatchesMu sync.Mutex
)

rowsPerPhase := 100
t.Log("Launching component to stream initial data...")
{
require.NoError(t, streamBuilder.AddBatchConsumerFunc(func(_ context.Context, mb service.MessageBatch) error {
Expand All @@ -457,47 +467,51 @@ microsoft_sql_server_cdc:
require.NoError(t, err)
license.InjectTestService(stream.Resources())

// --- launch input and insert initial rows for consumption
for range 1000 {
// --- insert initial rows and wait for CDC to process them
for range rowsPerPhase {
db.MustExec("INSERT INTO test.foo DEFAULT VALUES")
}
db.WaitForCDCChanges(t.Context(), rowsPerPhase, "test.foo")

go func() {
if err := stream.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
t.Error(err)
}
}()

time.Sleep(time.Second * 5)

assert.Eventually(t, func() bool {
outBatchesMu.Lock()
defer outBatchesMu.Unlock()
return len(outBatches) == 1000
return len(outBatches) == rowsPerPhase
}, time.Minute*5, time.Millisecond*100)
require.NoError(t, stream.StopWithin(time.Second*10))
}

t.Log("Relaunching component to resume from checkpoint...")
{
// --- now stopped, insert more rows
for range 1000 {
// --- now stopped, insert more rows and wait for CDC
for range rowsPerPhase {
db.MustExec("INSERT INTO test.foo DEFAULT VALUES")
}
db.WaitForCDCChanges(t.Context(), rowsPerPhase*2, "test.foo")

streamResume, err := streamBuilder.Build()
require.NoError(t, err)
license.InjectTestService(streamResume.Resources())
go func() {
require.NoError(t, streamResume.Run(t.Context()))
if err := streamResume.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
t.Error(err)
}
}()

totalWant := rowsPerPhase * 2
assert.Eventually(t, func() bool {
outBatchesMu.Lock()
defer outBatchesMu.Unlock()
return len(outBatches) == 2000
return len(outBatches) == totalWant
}, time.Minute*5, time.Millisecond*100)

require.Contains(t, outBatches[len(outBatches)-1], "2000")
require.Contains(t, outBatches[len(outBatches)-1], fmt.Sprintf("%d", totalWant))
require.NoError(t, streamResume.StopWithin(time.Second*10))
}
}
Expand Down Expand Up @@ -548,11 +562,18 @@ microsoft_sql_server_cdc:
require.NoError(t, err)
license.InjectTestService(stream.Resources())

// Run the stream in a cleanup-synchronised goroutine so a t.Error from a
// late Run error can't fire after the test function has returned.
streamErr := make(chan error, 1)
go func() {
if err := stream.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
t.Error(err)
}
streamErr <- stream.Run(t.Context())
}()
t.Cleanup(func() {
_ = stream.StopWithin(time.Second * 10)
if err := <-streamErr; err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("stream.Run: %v", err)
}
})

assert.Eventually(t, func() bool {
outBatchesMu.Lock()
Expand All @@ -566,7 +587,6 @@ microsoft_sql_server_cdc:
want = append(want, fmt.Sprintf(`{"b":%d}`, i))
}
require.Equal(t, want, outBatches, "Order of output does not match expected")
require.NoError(t, stream.StopWithin(time.Second*10))
}

func TestIntegration_MicrosoftSQLServerCDC_SnapshotAndStreaming_AllTypes(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions internal/impl/mssqlserver/mssqlservertest/mssqlservertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,32 @@ end:
db.T.Logf("Change Data Capture enabled for table %q", fullTableName)
}

// WaitForCDCChanges waits until the CDC change table for each given source table
// has at least minRows entries. Under x86 emulation on Apple Silicon the CDC
// capture agent can be very slow, so tests must poll rather than sleep.
func (db *TestDB) WaitForCDCChanges(ctx context.Context, minRows int, tables ...string) {
db.T.Helper()
for _, fullTableName := range tables {
table := strings.Split(fullTableName, ".")
if len(table) != 2 {
table = []string{"dbo", table[0]}
}
query := "SELECT COUNT(*) FROM [cdc].[" + table[0] + "_" + table[1] + "_CT]"
var lastCount int
if !assert.Eventually(db.T, func() bool {
if ctx.Err() != nil {
return false
}
if err := db.QueryRowContext(ctx, query).Scan(&lastCount); err != nil {
return false
}
return lastCount >= minRows
}, 5*time.Minute, time.Second) {
db.T.Fatalf("WaitForCDCChanges(%q): expected >= %d rows, got %d", fullTableName, minRows, lastCount)
}
}
}

// MustDisableCDC disables Change Data Capture on the specified table.
// The fullTableName should be in format "schema.table" (e.g., "dbo.all_data_types").
// If only a table name is provided, defaults to "dbo" schema.
Expand Down
Loading