Skip to content
Open
2 changes: 1 addition & 1 deletion cmd/tools/integration/packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
{"path": "./internal/impl/nats"},
{"path": "./internal/impl/nsq"},
{"path": "./internal/impl/opensearch"},
{"path": "./internal/impl/oracledb", "timeout": "10m"},
{"path": "./internal/impl/oracledb", "timeout": "30m"},
{"path": "./internal/impl/postgresql"},
{"path": "./internal/impl/pulsar", "timeout": "10m"},
{"path": "./internal/impl/qdrant"},
Expand Down
26 changes: 21 additions & 5 deletions internal/impl/oracledb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,11 @@ oracledb_cdc:

content, err := msgs[0].AsBytes()
assert.NoError(t, err)
assert.Equal(t, `{"ID":1,"VAL":1}`, string(content))
var row map[string]any
require.NoError(t, json.Unmarshal(content, &row))
assert.Len(t, row, 2)
assert.Contains(t, row, "ID")
assert.EqualValues(t, 1, row["VAL"])
})

t.Run("Streaming update changes...", func(t *testing.T) {
Expand All @@ -584,7 +588,11 @@ oracledb_cdc:

content, err := msgs[0].AsBytes()
assert.NoError(t, err)
assert.Equal(t, `{"ID":1,"VAL":2}`, string(content))
var row map[string]any
require.NoError(t, json.Unmarshal(content, &row))
assert.Len(t, row, 2)
assert.Contains(t, row, "ID")
assert.EqualValues(t, 2, row["VAL"])
})

t.Run("Streaming delete changes...", func(t *testing.T) {
Expand All @@ -597,7 +605,11 @@ oracledb_cdc:

content, err := msgs[0].AsBytes()
assert.NoError(t, err)
assert.Equal(t, `{"ID":1,"VAL":2}`, string(content))
var row map[string]any
require.NoError(t, json.Unmarshal(content, &row))
assert.Len(t, row, 2)
assert.Contains(t, row, "ID")
assert.EqualValues(t, 2, row["VAL"])
})

require.NoError(t, stream.StopWithin(time.Second*10))
Expand Down Expand Up @@ -708,7 +720,9 @@ oracledb_cdc:
require.NoError(t, stream.StopWithin(time.Second*10))
})

db.MustExec(`TRUNCATE TABLE RPCN.CDC_CHECKPOINT_CACHE`)
// The checkpoint cache table is created lazily during Connect(), so it may
// not exist if the first subtest failed before the stream was launched.
_, _ = db.Exec(`TRUNCATE TABLE RPCN.CDC_CHECKPOINT_CACHE`)

t.Run("lob_enabled=true", func(t *testing.T) {
for range snapshotRows {
Expand Down Expand Up @@ -786,7 +800,9 @@ oracledb_cdc:
}
})

require.NoError(t, stream.StopWithin(time.Second*10))
if stream != nil {
require.NoError(t, stream.StopWithin(time.Second*10))
}
}

func TestIntegrationOracleDBCDCSnapshotAndStreamingAllTypes(t *testing.T) {
Expand Down
134 changes: 124 additions & 10 deletions internal/impl/oracledb/logminer/logminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewMiner(db *sql.DB, userTables []replication.UserTable, publisher replicat
if len(userTables) > 0 {
opCodes := "6, 7, 36"
if cfg.LOBEnabled {
opCodes += ", 9, 10"
opCodes += ", 9, 10, 11"
}
buf.WriteString(" AND (OPERATION_CODE IN (" + opCodes + ")")
// DML carries the real table name — filter by configured tables.
Expand Down Expand Up @@ -145,10 +145,8 @@ func (lm *LogMiner) ReadChanges(ctx context.Context, startPos replication.SCN) (

defer func() {
if lm.sessionMgr.IsActive() {
if err := lm.sessionMgr.EndSession(ctx, conn); err != nil {
if ctx.Err() == nil && !errors.Is(err, context.Canceled) {
lm.log.Errorf("ending logminer session on exit: %v", err)
}
if err := lm.sessionMgr.EndSession(context.Background(), conn); err != nil {
lm.log.Errorf("ending logminer session on exit: %v", err)
}
}
}()
Expand Down Expand Up @@ -274,17 +272,19 @@ func (lm *LogMiner) processRedoEvent(ctx context.Context, redoEvent *sqlredo.Red

lm.txnCache.AddEvent(redoEvent.TransactionID, redoEvent.SCN, &event)

case sqlredo.OpSelectLobLocator:
case sqlredo.OpSelectLobLocator, sqlredo.OpLobTrim:
if !lm.cfg.LOBEnabled {
return nil
}
if !redoEvent.SQLRedo.Valid || redoEvent.SQLRedo.String == "" {
lm.log.Warnf("Skipping SELECT_LOB_LOCATOR with no SQL_REDO (scn=%d, txn=%s)", redoEvent.SCN, redoEvent.TransactionID)
lm.log.Warnf("Skipping %s with no SQL_REDO (scn=%d, txn=%s)", redoEvent.Operation, redoEvent.SCN, redoEvent.TransactionID)
return nil
}
// LOB_TRIM SQL has the same SELECT "COL" INTO ... FROM "SCHEMA"."TABLE" WHERE ...
// structure as SELECT_LOB_LOCATOR, so the same parser works for both.
info, err := sqlredo.ParseSelectLobLocator(redoEvent.SQLRedo.String)
if err != nil {
lm.log.Warnf("Failed to parse SELECT_LOB_LOCATOR SQL (scn=%d, txn=%s): %v\nSQL: %.500s", redoEvent.SCN, redoEvent.TransactionID, err, redoEvent.SQLRedo.String)
lm.log.Warnf("Failed to parse %s SQL (scn=%d, txn=%s): %v\nSQL: %.500s", redoEvent.Operation, redoEvent.SCN, redoEvent.TransactionID, err, redoEvent.SQLRedo.String)
return nil
}
// Resolve LOB type from the schema cache populated at startup.
Expand Down Expand Up @@ -315,8 +315,11 @@ func (lm *LogMiner) processRedoEvent(ctx context.Context, redoEvent *sqlredo.Red
}
state, exists := lm.lobStates[redoEvent.TransactionID]
if !exists || state.ActiveKey == nil {
lm.log.Warnf("Received LOB_WRITE without active LOB locator (scn=%d, txn=%s)", redoEvent.SCN, redoEvent.TransactionID)
return nil
if !lm.inferLOBLocator(redoEvent) {
lm.log.Warnf("Received LOB_WRITE without active LOB locator (scn=%d, txn=%s)", redoEvent.SCN, redoEvent.TransactionID)
return nil
}
state = lm.lobStates[redoEvent.TransactionID]
}
acc := state.Accumulators[*state.ActiveKey]
if acc == nil {
Expand Down Expand Up @@ -520,6 +523,117 @@ func (lm *LogMiner) isLOBOnlyEvent(ev *sqlredo.DMLEvent) bool {
return true
}

// inferLOBLocator attempts to create a LOB locator for a LOB_WRITE event that
// arrived without a preceding SELECT_LOB_LOCATOR. This happens with BASICFILE
// out-of-line LOBs where Oracle does not emit locator events in LogMiner.
//
// The method searches the transaction's buffered DML events for either a
// LOB-init UPDATE (inline-LOB path) or an INSERT (out-of-line LOB path) whose
// Data carries a LOB column with an EMPTY_CLOB()/EMPTY_BLOB() placeholder that
// doesn't yet have an accumulator. For INSERTs on BASICFILE columns with
// DISABLE STORAGE IN ROW, Oracle emits NULL in SQL_REDO instead of an empty
// placeholder; in that case the LOB column is absent from Data, so known LOB
// columns for the table are also considered as inference candidates.
// Returns true if a locator was successfully created.
func (lm *LogMiner) inferLOBLocator(event *sqlredo.RedoEvent) bool {
if !event.SchemaName.Valid || !event.TableName.Valid {
return false
}
schema := event.SchemaName.String
table := event.TableName.String
if schema == "" || table == "" {
return false
}

txn := lm.txnCache.GetTransaction(event.TransactionID)
if txn == nil {
return false
}

prefix := strings.ToUpper(schema + "." + table + ".")

// Search backwards for the most recent event that can carry a LOB-init
// placeholder for this table: LOB-only UPDATE (inline-LOB path) or INSERT
// (BASICFILE out-of-line LOB path, where no LOB-init UPDATE is emitted).
for i := len(txn.Events) - 1; i >= 0; i-- {
ev := txn.Events[i]
if ev.Schema != schema || ev.Table != table {
continue
}

var pkValues map[string]any
switch {
case ev.Operation == sqlredo.OpUpdate && lm.isLOBOnlyEvent(ev):
pkValues = ev.OldValues
case ev.Operation == sqlredo.OpInsert:
// Use the INSERT's non-LOB columns as the PK identifier so that
// MergeLOBsIntoDMLEvents can still match this INSERT after one of
// its LOB columns has been overwritten with the assembled value
// (important when an INSERT has multiple out-of-line LOBs).
pkValues = make(map[string]any, len(ev.Data))
for col, val := range ev.Data {
if _, isLOB := lm.lobColTypes[prefix+strings.ToUpper(col)]; isLOB {
continue
}
pkValues[col] = val
}
default:
continue
}

pkString := sqlredo.FormatPKString(pkValues)

// Candidate LOB columns are those with an EMPTY_CLOB()/EMPTY_BLOB()
// placeholder (parsed as empty []byte). For INSERTs, BASICFILE columns
// with DISABLE STORAGE IN ROW emit NULL in SQL_REDO instead — the column
// is absent from Data — so iterate every known LOB column for the table.
for k, lobType := range lm.lobColTypes {
if !strings.HasPrefix(k, prefix) {
continue
}
col := k[len(prefix):]
val, present := ev.Data[col]
switch {
case present:
if b, ok := val.([]byte); !ok || len(b) != 0 {
continue
}
case ev.Operation != sqlredo.OpInsert:
continue
}

key := sqlredo.LobKey{
Schema: schema,
Table: table,
Column: col,
PKString: pkString,
}

// Defer state creation until we have a match to avoid leaking
// empty TxnLOBState entries when inference fails.
state := lm.getOrCreateLOBState(event.TransactionID)
if _, exists := state.Accumulators[key]; exists {
continue
}

state.Accumulators[key] = &sqlredo.LobAccumulator{
Schema: schema,
Table: table,
Column: col,
PKValues: pkValues,
IsBinary: lobType == "BLOB",
}
state.ActiveKey = &key

lm.log.Debugf("Inferred LOB locator for %s.%s.%s from %s (txn=%s)",
schema, table, col, ev.Operation, event.TransactionID)
return true
}
}

return false
}

func (lm *LogMiner) queryLogMinerContents(ctx context.Context, conn *sql.Conn, startSCN, endSCN uint64, processEvent func(context.Context, *sqlredo.RedoEvent) error) error {
if len(lm.tables) == 0 {
return nil
Expand Down
6 changes: 6 additions & 0 deletions internal/impl/oracledb/logminer/sqlredo/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
OpSelectLobLocator Operation = 9
// OpLobWrite represents a LOB_WRITE operation (op 10)
OpLobWrite Operation = 10
// OpLobTrim represents a LOB_TRIM operation (op 11)
OpLobTrim Operation = 11
)

// String converts the operation type to a string equivalent.
Expand All @@ -59,6 +61,8 @@ func (op Operation) String() string {
return "select_lob_locator"
case OpLobWrite:
return "lob_write"
case OpLobTrim:
return "lob_trim"
default:
return fmt.Sprintf("unknown operation (%d)", int64(op))
}
Expand Down Expand Up @@ -104,6 +108,8 @@ func operationFromCode(code int64) Operation {
return OpSelectLobLocator
case 10:
return OpLobWrite
case 11:
return OpLobTrim
default:
return OpUnknown
}
Expand Down
11 changes: 9 additions & 2 deletions internal/impl/oracledb/replication/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,19 @@ func (s *Snapshot) snapshotTable(ctx context.Context, table UserTable, maxBatchS
s.log.Errorf("Switching session back to root container: %v", err)
}
}()
if tx, err = conn.BeginTx(ctx, nil); err != nil {
// Use context.Background() to prevent database/sql from spawning an
// awaitDone goroutine that races with our explicit Rollback below.
// The go-ora v2 driver has an unsynchronized field in Session that
// causes a data race between BreakConnection (from awaitDone) and
// IsBreak (from our Rollback). Transaction lifetime is managed
// manually via the defer and explicit Rollback at the end.
if tx, err = conn.BeginTx(context.Background(), nil); err != nil {
return fmt.Errorf("beginning snapshot transaction: %w", err)
}
default:
// Non-CDB mode: use db.BeginTx directly — no *Conn needed.
if tx, err = s.dbPool.BeginTx(ctx, nil); err != nil {
// See CDB path comment above for why context.Background() is used.
if tx, err = s.dbPool.BeginTx(context.Background(), nil); err != nil {
return fmt.Errorf("beginning snapshot transaction: %w", err)
}
}
Expand Down
Loading