Skip to content

Commit 131f70d

Browse files
adlerhurstlivio-a
andauthored
fix(eventstore): use decimal, correct mirror (zitadel#9914)
# Eventstore fixes - `event.Position` used float64 before which can lead to [precision loss](golang/go#47300). The type got replaced by [a type without precision loss](https://github.com/jackc/pgx-shopspring-decimal) - the handler reported the wrong error if the current state was updated and therefore took longer to retry failed events. # Mirror fixes - max age of auth requests can be configured to speed up copying data from `auth.auth_requests` table. Auth requests last updated before the set age will be ignored. Default is 1 month - notification projections are skipped because notifications should be sent by the source system. The projections are set to the latest position - ensure that mirror can be executed multiple times --------- Co-authored-by: Livio Spring <[email protected]>
1 parent 046b165 commit 131f70d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+436
-236
lines changed

cmd/mirror/event.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package mirror
33
import (
44
"context"
55

6+
"github.com/shopspring/decimal"
7+
68
"github.com/zitadel/zitadel/internal/v2/eventstore"
79
"github.com/zitadel/zitadel/internal/v2/readmodel"
810
"github.com/zitadel/zitadel/internal/v2/system"
@@ -29,7 +31,7 @@ func queryLastSuccessfulMigration(ctx context.Context, destinationES *eventstore
2931
return lastSuccess, nil
3032
}
3133

32-
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position float64) error {
34+
func writeMigrationSucceeded(ctx context.Context, destinationES *eventstore.EventStore, id, source string, position decimal.Decimal) error {
3335
return destinationES.Push(
3436
ctx,
3537
eventstore.NewPushIntent(

cmd/mirror/event_store.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"io"
99
"time"
1010

11+
"github.com/jackc/pgx/v5/pgconn"
1112
"github.com/jackc/pgx/v5/stdlib"
13+
"github.com/shopspring/decimal"
1214
"github.com/spf13/cobra"
1315
"github.com/spf13/viper"
1416
"github.com/zitadel/logging"
@@ -89,7 +91,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
8991
previousMigration, err := queryLastSuccessfulMigration(ctx, destinationES, source.DatabaseName())
9092
logging.OnError(err).Fatal("unable to query latest successful migration")
9193

92-
var maxPosition float64
94+
var maxPosition decimal.Decimal
9395
err = source.QueryRowContext(ctx,
9496
func(row *sql.Row) error {
9597
return row.Scan(&maxPosition)
@@ -101,7 +103,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
101103
logging.WithFields("from", previousMigration.Position, "to", maxPosition).Info("start event migration")
102104

103105
nextPos := make(chan bool, 1)
104-
pos := make(chan float64, 1)
106+
pos := make(chan decimal.Decimal, 1)
105107
errs := make(chan error, 3)
106108

107109
go func() {
@@ -152,7 +154,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
152154
go func() {
153155
defer close(pos)
154156
for range nextPos {
155-
var position float64
157+
var position decimal.Decimal
156158
err := dest.QueryRowContext(
157159
ctx,
158160
func(row *sql.Row) error {
@@ -175,6 +177,10 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
175177
tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY eventstore.events2 FROM STDIN")
176178
eventCount = tag.RowsAffected()
177179
if err != nil {
180+
pgErr := new(pgconn.PgError)
181+
errors.As(err, &pgErr)
182+
183+
logging.WithError(err).WithField("pg_err_details", pgErr.Detail).Error("unable to copy events into destination")
178184
return zerrors.ThrowUnknown(err, "MIGRA-DTHi7", "unable to copy events into destination")
179185
}
180186

@@ -187,7 +193,7 @@ func copyEvents(ctx context.Context, source, dest *db.DB, bulkSize uint32) {
187193
logging.WithFields("took", time.Since(start), "count", eventCount).Info("events migrated")
188194
}
189195

190-
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position float64, errs <-chan error) {
196+
func writeCopyEventsDone(ctx context.Context, es *eventstore.EventStore, id, source string, position decimal.Decimal, errs <-chan error) {
191197
joinedErrs := make([]error, 0, len(errs))
192198
for err := range errs {
193199
joinedErrs = append(joinedErrs, err)

cmd/mirror/projections.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,13 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc
296296
continue
297297
}
298298

299+
err = projection.ProjectInstanceFields(ctx)
300+
if err != nil {
301+
logging.WithFields("instance", instance).WithError(err).Info("trigger fields failed")
302+
failedInstances <- instance
303+
continue
304+
}
305+
299306
err = auth_handler.ProjectInstance(ctx)
300307
if err != nil {
301308
logging.WithFields("instance", instance).WithError(err).Info("trigger auth handler failed")

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ require (
4545
github.com/h2non/gock v1.2.0
4646
github.com/hashicorp/golang-lru/v2 v2.0.7
4747
github.com/improbable-eng/grpc-web v0.15.0
48+
github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e
4849
github.com/jackc/pgx/v5 v5.7.5
4950
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52
5051
github.com/jinzhu/gorm v1.9.16
@@ -65,6 +66,7 @@ require (
6566
github.com/riverqueue/river/rivertype v0.22.0
6667
github.com/rs/cors v1.11.1
6768
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
69+
github.com/shopspring/decimal v1.3.1
6870
github.com/sony/gobreaker/v2 v2.1.0
6971
github.com/sony/sonyflake v1.2.1
7072
github.com/spf13/cobra v1.9.1

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
442442
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
443443
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
444444
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
445+
github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e h1:i3gQ/Zo7sk4LUVbsAjTNeC4gIjoPNIZVzs4EXstssV4=
446+
github.com/jackc/pgx-shopspring-decimal v0.0.0-20220624020537-1d36b5a1853e/go.mod h1:zUHglCZ4mpDUPgIwqEKoba6+tcUQzRdb1+DPTuYe9pI=
445447
github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs=
446448
github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
447449
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
@@ -705,6 +707,8 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0
705707
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4=
706708
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1/go.mod h1:uToXkOrWAZ6/Oc07xWQrPOhJotwFIyu2bBVN41fcDUY=
707709
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
710+
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
711+
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
708712
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
709713
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
710714
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=

internal/admin/repository/eventsourcing/handler/handler.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package handler
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

9+
"github.com/jackc/pgx/v5/pgconn"
810
"github.com/zitadel/logging"
911

1012
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
@@ -63,9 +65,17 @@ func Start(ctx context.Context) {
6365
func ProjectInstance(ctx context.Context) error {
6466
for i, projection := range projections {
6567
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting admin projection")
66-
_, err := projection.Trigger(ctx)
67-
if err != nil {
68-
return err
68+
for {
69+
_, err := projection.Trigger(ctx)
70+
if err == nil {
71+
break
72+
}
73+
var pgErr *pgconn.PgError
74+
errors.As(err, &pgErr)
75+
if pgErr.Code != database.PgUniqueConstraintErrorCode {
76+
return err
77+
}
78+
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("admin projection failed because of unique constraint, retrying")
6979
}
7080
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("admin projection done")
7181
}

internal/api/oidc/key.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/go-jose/go-jose/v4"
1212
"github.com/jonboulle/clockwork"
1313
"github.com/muhlemmer/gu"
14+
"github.com/shopspring/decimal"
1415
"github.com/zitadel/logging"
1516
"github.com/zitadel/oidc/v3/pkg/op"
1617

@@ -350,14 +351,14 @@ func (o *OPStorage) getSigningKey(ctx context.Context) (op.SigningKey, error) {
350351
if len(keys.Keys) > 0 {
351352
return PrivateKeyToSigningKey(SelectSigningKey(keys.Keys), o.encAlg)
352353
}
353-
var position float64
354+
var position decimal.Decimal
354355
if keys.State != nil {
355356
position = keys.State.Position
356357
}
357358
return nil, o.refreshSigningKey(ctx, position)
358359
}
359360

360-
func (o *OPStorage) refreshSigningKey(ctx context.Context, position float64) error {
361+
func (o *OPStorage) refreshSigningKey(ctx context.Context, position decimal.Decimal) error {
361362
ok, err := o.ensureIsLatestKey(ctx, position)
362363
if err != nil || !ok {
363364
return zerrors.ThrowInternal(err, "OIDC-ASfh3", "cannot ensure that projection is up to date")
@@ -369,12 +370,12 @@ func (o *OPStorage) refreshSigningKey(ctx context.Context, position float64) err
369370
return zerrors.ThrowInternal(nil, "OIDC-Df1bh", "")
370371
}
371372

372-
func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position float64) (bool, error) {
373-
maxSequence, err := o.getMaxKeySequence(ctx)
373+
func (o *OPStorage) ensureIsLatestKey(ctx context.Context, position decimal.Decimal) (bool, error) {
374+
maxSequence, err := o.getMaxKeyPosition(ctx)
374375
if err != nil {
375376
return false, fmt.Errorf("error retrieving new events: %w", err)
376377
}
377-
return position >= maxSequence, nil
378+
return position.GreaterThanOrEqual(maxSequence), nil
378379
}
379380

380381
func PrivateKeyToSigningKey(key query.PrivateKey, algorithm crypto.EncryptionAlgorithm) (_ op.SigningKey, err error) {
@@ -412,9 +413,9 @@ func (o *OPStorage) lockAndGenerateSigningKeyPair(ctx context.Context) error {
412413
return o.command.GenerateSigningKeyPair(setOIDCCtx(ctx), "RS256")
413414
}
414415

415-
func (o *OPStorage) getMaxKeySequence(ctx context.Context) (float64, error) {
416-
return o.eventstore.LatestSequence(ctx,
417-
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).
416+
func (o *OPStorage) getMaxKeyPosition(ctx context.Context) (decimal.Decimal, error) {
417+
return o.eventstore.LatestPosition(ctx,
418+
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
418419
ResourceOwner(authz.GetInstance(ctx).InstanceID()).
419420
AwaitOpenTransactions().
420421
AddQuery().

internal/api/saml/certificate.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/go-jose/go-jose/v4"
9+
"github.com/shopspring/decimal"
910
"github.com/zitadel/logging"
1011
"github.com/zitadel/saml/pkg/provider/key"
1112

@@ -76,7 +77,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag
7677
return p.certificateToCertificateAndKey(selectCertificate(certs.Certificates))
7778
}
7879

79-
var position float64
80+
var position decimal.Decimal
8081
if certs.State != nil {
8182
position = certs.State.Position
8283
}
@@ -87,7 +88,7 @@ func (p *Storage) getCertificateAndKey(ctx context.Context, usage crypto.KeyUsag
8788
func (p *Storage) refreshCertificate(
8889
ctx context.Context,
8990
usage crypto.KeyUsage,
90-
position float64,
91+
position decimal.Decimal,
9192
) error {
9293
ok, err := p.ensureIsLatestCertificate(ctx, position)
9394
if err != nil {
@@ -103,12 +104,12 @@ func (p *Storage) refreshCertificate(
103104
return nil
104105
}
105106

106-
func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position float64) (bool, error) {
107-
maxSequence, err := p.getMaxKeySequence(ctx)
107+
func (p *Storage) ensureIsLatestCertificate(ctx context.Context, position decimal.Decimal) (bool, error) {
108+
maxSequence, err := p.getMaxKeyPosition(ctx)
108109
if err != nil {
109110
return false, fmt.Errorf("error retrieving new events: %w", err)
110111
}
111-
return position >= maxSequence, nil
112+
return position.GreaterThanOrEqual(maxSequence), nil
112113
}
113114

114115
func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage crypto.KeyUsage) error {
@@ -151,9 +152,9 @@ func (p *Storage) lockAndGenerateCertificateAndKey(ctx context.Context, usage cr
151152
}
152153
}
153154

154-
func (p *Storage) getMaxKeySequence(ctx context.Context) (float64, error) {
155-
return p.eventstore.LatestSequence(ctx,
156-
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxSequence).
155+
func (p *Storage) getMaxKeyPosition(ctx context.Context) (decimal.Decimal, error) {
156+
return p.eventstore.LatestPosition(ctx,
157+
eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).
157158
ResourceOwner(authz.GetInstance(ctx).InstanceID()).
158159
AwaitOpenTransactions().
159160
AddQuery().

internal/auth/repository/eventsourcing/handler/handler.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package handler
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

9+
"github.com/jackc/pgx/v5/pgconn"
810
"github.com/zitadel/logging"
911

1012
"github.com/zitadel/zitadel/internal/api/authz"
@@ -78,9 +80,17 @@ func Projections() []*handler2.Handler {
7880
func ProjectInstance(ctx context.Context) error {
7981
for i, projection := range projections {
8082
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection")
81-
_, err := projection.Trigger(ctx)
82-
if err != nil {
83-
return err
83+
for {
84+
_, err := projection.Trigger(ctx)
85+
if err == nil {
86+
break
87+
}
88+
var pgErr *pgconn.PgError
89+
errors.As(err, &pgErr)
90+
if pgErr.Code != database.PgUniqueConstraintErrorCode {
91+
return err
92+
}
93+
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID()).WithError(err).Debug("auth projection failed because of unique constraint, retrying")
8494
}
8595
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done")
8696
}

internal/database/database.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func CloseTransaction(tx Tx, err error) error {
6464
return commitErr
6565
}
6666

67+
const (
68+
PgUniqueConstraintErrorCode = "23505"
69+
)
70+
6771
type Config struct {
6872
Dialects map[string]interface{} `mapstructure:",remain"`
6973
connector dialect.Connector

0 commit comments

Comments
 (0)