Skip to content

Commit 33d2963

Browse files
Merge branch 'staging' into test/integration-test-kafka
2 parents 9c1cc28 + a65b47e commit 33d2963

3 files changed

Lines changed: 68 additions & 10 deletions

File tree

.trivyignore

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,10 @@ CVE-2025-30065
22
CVE-2018-1320
33
CVE-2019-0205
44
CVE-2020-13949
5-
CVE-2025-46762
5+
CVE-2025-46762
6+
# CVE-2026-34040: Moby AuthZ plugin bypass via oversized request bodies.
7+
# TODO: Upgrade testcontainers-go once this PR is merged and released:
8+
# https://github.com/testcontainers/testcontainers-go/pull/3617
9+
# This vulnerability only affects Docker AuthZ plugins (not used by Olake)
10+
# and is only present in test-time dependencies.
11+
CVE-2026-34040

constants/state_version.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@ package constants
2626
// * Earlier if the session timezone or global was set in offset format, it was not parsed correctly and used to fallback to UTC.
2727
// * Now it parses the offset correctly and uses the timezone offset to set the timezone for the connection.
2828
//
29-
// - Version 4: (Current Version) Unsigned int/integer/bigint map to Int64.
29+
// - Version 4: Unsigned int/integer/bigint map to Int64.
3030
// * Earlier unsigned int/integer/bigint were mapped to Int32 which caused integer overflows.
31+
//
32+
// - Version 5: (Current Version) MongoDB nested DateTime values decoded as UTC time.Time.
33+
// * BSON DateTime at any depth is now decoded directly to time.Time (UTC) via a custom client registry, preventing json.Marshal crashes for out-of-range years ([0,9999]).
34+
// * Top-level DateTime fields that previously formatted with the local machine timezone (e.g. "+05:30") now always output UTC ("Z").
3135

3236
const (
33-
LatestStateVersion = 4
37+
LatestStateVersion = 5
3438
)
3539

3640
// Used as the current version of the state when the program is running

drivers/mongodb/internal/mon.go

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"math"
77
"net"
8+
"reflect"
89
"strings"
910
"sync"
1011
"time"
@@ -16,12 +17,65 @@ import (
1617
"github.com/datazip-inc/olake/utils/logger"
1718
"github.com/datazip-inc/olake/utils/typeutils"
1819
"go.mongodb.org/mongo-driver/bson"
20+
"go.mongodb.org/mongo-driver/bson/bsoncodec"
21+
"go.mongodb.org/mongo-driver/bson/bsonrw"
1922
"go.mongodb.org/mongo-driver/bson/primitive"
2023
"go.mongodb.org/mongo-driver/mongo"
2124
"go.mongodb.org/mongo-driver/mongo/options"
2225
"golang.org/x/crypto/ssh"
2326
)
2427

28+
// safeDecodeRegistry is a BSON decode registry registered on the MongoDB client.
29+
// It intercepts two problem cases for interface{} slots (every value in bson.M):
30+
//
31+
// 1. BSON Double (float64) — NaN and ±Inf are not valid JSON; they crash
32+
// encoding/json. These are coerced to nil for all state versions.
33+
//
34+
// 2. BSON DateTime — primitive.DateTime.MarshalJSON calls time.Time.MarshalJSON
35+
// which panics for years outside [0, 9999]. For state version > 4, DateTime
36+
// is decoded as a clamped UTC time.Time so downstream json.Marshal is always
37+
// safe. For state version ≤ 4 the fallback (primitive.DateTime) is used to
38+
// preserve the pre-existing local-timezone output format.
39+
var safeDecodeRegistry = func() *bsoncodec.Registry {
40+
tEmpty := reflect.TypeOf((*interface{})(nil)).Elem()
41+
reg := bson.NewRegistry()
42+
// Capture the stock interface{} decoder before replacing it.
43+
fallback, _ := reg.LookupDecoder(tEmpty)
44+
reg.RegisterTypeDecoder(
45+
tEmpty,
46+
bsoncodec.ValueDecoderFunc(func(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error {
47+
switch vr.Type() {
48+
case bson.TypeDouble:
49+
f, err := vr.ReadDouble()
50+
if err != nil {
51+
return err
52+
}
53+
if math.IsNaN(f) || math.IsInf(f, 0) {
54+
val.Set(reflect.Zero(val.Type()))
55+
} else {
56+
val.Set(reflect.ValueOf(f))
57+
}
58+
return nil
59+
case bson.TypeDateTime:
60+
if constants.LoadedStateVersion > 4 {
61+
ms, err := vr.ReadDateTime()
62+
if err != nil {
63+
return err
64+
}
65+
t, err := typeutils.ReformatDate(primitive.DateTime(ms).Time().UTC(), true)
66+
if err != nil {
67+
return err
68+
}
69+
val.Set(reflect.ValueOf(t))
70+
return nil
71+
}
72+
}
73+
return fallback.DecodeValue(dc, vr, val)
74+
}),
75+
)
76+
return reg
77+
}()
78+
2579
const (
2680
cdcCursorField = "_data"
2781
)
@@ -84,6 +138,7 @@ func (m *Mongo) Setup(ctx context.Context) error {
84138

85139
opts.ApplyURI(m.config.URI())
86140
opts.SetCompressors([]string{"snappy"}) // using Snappy compression; read here https://en.wikipedia.org/wiki/Snappy_(compression)
141+
opts.SetRegistry(safeDecodeRegistry)
87142
if m.sshDialer != nil {
88143
opts.SetDialer(m.sshDialer)
89144
}
@@ -267,7 +322,6 @@ func filterMongoObject(doc bson.M) {
267322
var err error
268323
doc[key], err = typeutils.ReformatDate(t, true)
269324
if err != nil {
270-
logger.Warnf("failed to reformat date for key %s: %s", key, err)
271325
doc[key] = time.Unix(0, 0).UTC()
272326
}
273327
case primitive.Null:
@@ -278,12 +332,6 @@ func filterMongoObject(doc bson.M) {
278332
doc[key] = value.String()
279333
case primitive.ObjectID:
280334
doc[key] = value.Hex()
281-
case float64:
282-
if math.IsNaN(value) || math.IsInf(value, 0) {
283-
doc[key] = nil
284-
} else {
285-
doc[key] = value
286-
}
287335
default:
288336
doc[key] = value
289337
}

0 commit comments

Comments
 (0)