Skip to content

Commit 36090c3

Browse files
oracledb_cdc: add source_ts_ms metadata (#4250)
1 parent 692d35a commit 36090c3

5 files changed

Lines changed: 26 additions & 8 deletions

File tree

docs/modules/components/pages/inputs/oracledb_cdc.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ This input adds the following metadata fields to each message:
120120
- table_name: Name of the table that the message originated from.
121121
- operation: Type of operation that generated the message: "read", "delete", "insert", or "update". "read" is from messages that are read in the initial snapshot phase.
122122
- scn: the System Change Number in Oracle.
123+
- source_ts_ms: The timestamp of when Oracle wrote the change record into the redo log, expressed as milliseconds since the Unix epoch. This reflects the database server's wall-clock time at the moment the DML executed, not the transaction commit time.
123124
- schema: The table schema, for use with schema-aware downstream processors such as `schema_registry_encode`. When new columns are detected in CDC events, the schema is automatically refreshed from the Oracle catalog. Dropped columns are reflected after a connector restart.
124125
125126
== Permissions

internal/impl/oracledb/batcher.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"context"
1313
"encoding/json"
1414
"fmt"
15+
"strconv"
1516
"sync"
1617
"time"
1718

@@ -166,6 +167,12 @@ func (b *batchPublisher) Publish(ctx context.Context, m *replication.MessageEven
166167
if m.SCN.IsValid() {
167168
msg.MetaSet("scn", m.SCN.String())
168169
}
170+
if !m.Timestamp.IsZero() {
171+
// upcon connection go-ora automatically queries the server's timezone and stores
172+
// it in conn.dbServerTimeZone so it can convert the redo log timestamp
173+
// from database-local time to UTC
174+
msg.MetaSet("source_ts_ms", strconv.FormatInt(m.Timestamp.UnixMilli(), 10))
175+
}
169176
if m.CheckpointSCN.IsValid() {
170177
msg.MetaSet("checkpoint_scn", m.CheckpointSCN.String())
171178
}

internal/impl/oracledb/input_oracledb_cdc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ This input adds the following metadata fields to each message:
7676
- table_name: Name of the table that the message originated from.
7777
- operation: Type of operation that generated the message: "read", "delete", "insert", or "update". "read" is from messages that are read in the initial snapshot phase.
7878
- scn: the System Change Number in Oracle.
79+
- source_ts_ms: The timestamp of when Oracle wrote the change record into the redo log, expressed as milliseconds since the Unix epoch. This reflects the database server's wall-clock time at the moment the DML executed, not the transaction commit time.
7980
- schema: The table schema, for use with schema-aware downstream processors such as ` + "`schema_registry_encode`" + `. When new columns are detected in CDC events, the schema is automatically refreshed from the Oracle catalog. Dropped columns are reflected after a connector restart.
8081
8182
== Permissions

internal/impl/oracledb/integration_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"encoding/json"
1414
"errors"
1515
"fmt"
16+
"strconv"
1617
"strings"
1718
"sync"
1819
"testing"
@@ -541,6 +542,14 @@ oracledb_cdc:
541542
op, ok := msg.MetaGet("operation")
542543
require.Truef(t, ok, "message %d missing 'operation' metadata", i)
543544
assert.Equalf(t, operation, op, "message %d: expected operation '%s', got %q", i, operation, op)
545+
546+
tsStr, ok := msg.MetaGet("source_ts_ms")
547+
require.Truef(t, ok, "message %d missing 'source_ts_ms' metadata", i)
548+
tsMs, err := strconv.ParseInt(tsStr, 10, 64)
549+
require.NoErrorf(t, err, "message %d: source_ts_ms %q is not a valid int64", i, tsStr)
550+
tsTime := time.UnixMilli(tsMs)
551+
assert.Truef(t, tsTime.After(time.Now().Add(-5*time.Minute)), "message %d: source_ts_ms %d is too far in the past", i, tsMs)
552+
assert.Truef(t, tsTime.Before(time.Now().Add(time.Minute)), "message %d: source_ts_ms %d is in the future", i, tsMs)
544553
}
545554

546555
for _, expectedKey := range []string{"TESTDB.FOO", "TESTDB.FOO2", "TESTDB2.BAR"} {

internal/impl/oracledb/replication/stream_message.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,12 @@ type ColumnMeta struct {
117117

118118
// MessageEvent represents a single change from Table's change table in the database.
119119
type MessageEvent struct {
120-
SCN SCN `json:"start_scn"`
121-
CheckpointSCN SCN `json:"-"`
122-
Operation OpType `json:"operation"`
123-
Schema string `json:"schema"`
124-
Table string `json:"table"`
125-
Data any `json:"data"`
126-
Timestamp time.Time `json:"timestamp"`
127-
ColumnMeta []ColumnMeta `json:"-"`
120+
SCN SCN
121+
CheckpointSCN SCN
122+
Operation OpType
123+
Schema string
124+
Table string
125+
Data any
126+
Timestamp time.Time
127+
ColumnMeta []ColumnMeta
128128
}

0 commit comments

Comments
 (0)