diff --git a/services/skus/datastore.go b/services/skus/datastore.go index aa1e0be2b..a1c3a2334 100644 --- a/services/skus/datastore.go +++ b/services/skus/datastore.go @@ -189,7 +189,6 @@ func (pg *Postgres) CreateKey(merchant string, name string, encryptedSecretKey s RETURNING id, name, merchant_id, encrypted_secret_key, nonce, created_at, expiry `, merchant, name, encryptedSecretKey, nonce) - if err != nil { return nil, fmt.Errorf("failed to create key for merchant: %w", err) } @@ -231,7 +230,6 @@ func (pg *Postgres) GetKeysByMerchant(merchant string, showExpired bool) (*[]Key where merchant_id = $1`+expiredQuery+" ORDER BY name, created_at", merchant) - if err != nil { return nil, fmt.Errorf("failed to get keys for merchant: %w", err) } @@ -255,7 +253,6 @@ func (pg *Postgres) GetKey(id uuid.UUID, showExpired bool) (*Key, error) { where id = $1`+expiredQuery, id.String()) - if err != nil { return nil, fmt.Errorf("failed to get key: %w", err) } @@ -374,7 +371,8 @@ func (pg *Postgres) GetOrderItem(ctx context.Context, itemID uuid.UUID) (*OrderI // GetPagedMerchantTransactions - get a paginated list of transactions for a merchant func (pg *Postgres) GetPagedMerchantTransactions( - ctx context.Context, merchantID uuid.UUID, pagination *inputs.Pagination) (*[]Transaction, int, error) { + ctx context.Context, merchantID uuid.UUID, pagination *inputs.Pagination, +) (*[]Transaction, int, error) { var ( total int err error @@ -426,7 +424,7 @@ func (pg *Postgres) GetPagedMerchantTransactions( return nil, 0, err } for rows.Next() { - var transaction = new(Transaction) + transaction := new(Transaction) err := rows.StructScan(transaction) if err != nil { return nil, 0, err @@ -448,7 +446,6 @@ func (pg *Postgres) GetTransactions(orderID uuid.UUID) (*[]Transaction, error) { FROM transactions WHERE order_id = $1` transactions := []Transaction{} err := pg.RawDB().Select(&transactions, statement, orderID) - if err != nil { return nil, err } @@ -558,13 +555,11 @@ func (pg *Postgres) CreateTransaction(orderID uuid.UUID, externalTransactionID s VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, order_id, created_at, updated_at, external_transaction_id, status, currency, kind, amount `, orderID, externalTransactionID, status, currency, kind, amount) - if err != nil { return nil, err } err = tx.Commit() - if err != nil { return nil, err } @@ -584,13 +579,11 @@ func (pg *Postgres) UpdateTransaction(orderID uuid.UUID, externalTransactionID s where external_transaction_id=$5 and order_id=$6 RETURNING id, order_id, created_at, updated_at, external_transaction_id, status, currency, kind, amount `, status, currency, kind, amount, externalTransactionID, orderID) - if err != nil { return nil, err } err = tx.Commit() - if err != nil { return nil, err } @@ -822,7 +815,7 @@ FOR UPDATE } for rows.Next() { - var vr = new(VoteRecord) + vr := new(VoteRecord) if err := rows.Scan(&vr.ID, &vr.RequestCredentials, &vr.VoteText, &vr.VoteEventBinary, &vr.Erred, &vr.Processed); err != nil { return tx, nil, fmt.Errorf("failed to scan vote drain record: %w", err) @@ -847,9 +840,8 @@ func (pg *Postgres) MarkVoteErrored(ctx context.Context, vr VoteRecord, tx *sqlx logger := logging.Logger(ctx, "skus.MarkVoteErrored") logger.Debug().Msg("about to set errored to true for this vote") - var statement = `update vote_drain set erred=true where id=$1` + statement := `update vote_drain set erred=true where id=$1` _, err := tx.ExecContext(ctx, statement, vr.ID) - if err != nil { logger.Error().Err(err).Msg("failed to update vote_drain") return fmt.Errorf("failed to commit vote from drain: %w", err) @@ -863,9 +855,8 @@ func (pg *Postgres) CommitVote(ctx context.Context, vr VoteRecord, tx *sqlx.Tx) logger := logging.Logger(ctx, "skus.CommitVote") logger.Debug().Msg("about to set processed to true for this vote") - var statement = `update vote_drain set processed=true where id=$1` + statement := `update vote_drain set processed=true where id=$1` _, err := tx.ExecContext(ctx, statement, vr.ID) - if err != nil { logger.Error().Err(err).Msg("unable to update processed=true for vote drain job") return fmt.Errorf("failed to commit vote from drain: %w", err) @@ -1118,12 +1109,28 @@ func (pg *Postgres) SendSigningRequest(ctx context.Context, signingRequestWriter if err != nil { return fmt.Errorf("error send signing request could not begin tx: %w", err) } - defer rollback() + + // txTransferred prevents the deferred rollback in the main function from firing + // when SendSigningRequest returns after launching the goroutine. Without the guard, + // the defer would roll back the transaction while the goroutine is still using it. + // Once txTransferred is true, the goroutine owns the transaction and its own + // defer rollback() is the only one that will fire. + txTransferred := false + defer func() { + if !txTransferred { + rollback() + } + }() var soro []SigningOrderRequestOutbox - err = tx.SelectContext(ctx, &soro, `select request_id, order_id, item_id, message_data from signing_order_request_outbox - where submitted_at is null order by created_at asc - for update skip locked limit $1`, signingRequestBatchSize) + err = tx.SelectContext( + ctx, + &soro, + `select request_id, order_id, item_id, message_data from signing_order_request_outbox + where submitted_at is null order by created_at asc + for update skip locked limit $1`, + signingRequestBatchSize, + ) if err != nil { return fmt.Errorf("error could not get signing order request outbox: %w", err) } @@ -1132,10 +1139,19 @@ func (pg *Postgres) SendSigningRequest(ctx context.Context, signingRequestWriter return nil } - // If there is an error writing messages to kafka we need to log the failed messages here instead of returning - // to the job runner, we can then update the messages as processed and continue to the next batch rather than - // retrying, these errors are likely not transient and need checked before retry. + soroIDs := make([]uuid.UUID, len(soro)) + for i := 0; i < len(soroIDs); i++ { + soroIDs[i] = soro[i].RequestID + } + + txTransferred = true go func() { + defer rollback() + + // If there is a total failure writing to kafka, return without committing so the rows + // remain unsubmitted and are retried on the next cycle. Per-message WriteErrors are + // logged and the rows are still marked submitted to avoid duplicate sends for those + // that succeeded. switch err := signingRequestWriter.WriteMessages(ctx, soro).(type) { case nil: case kafka.WriteErrors: @@ -1152,38 +1168,41 @@ func (pg *Postgres) SendSigningRequest(ctx context.Context, signingRequestWriter Interface("messages", soro). Msg("error writing outbox messages") sentry.CaptureException(err) + return } - }() - soroIDs := make([]uuid.UUID, len(soro)) - for i := 0; i < len(soroIDs); i++ { - soroIDs[i] = soro[i].RequestID - } - - qry, args, err := sqlx.In(`update signing_order_request_outbox - set submitted_at = now() where request_id IN (?)`, soroIDs) - if err != nil { - return fmt.Errorf("error creating sql update statement: %w", err) - } + qry, args, err := sqlx.In(`update signing_order_request_outbox + set submitted_at = now() where request_id IN (?)`, soroIDs) + if err != nil { + logging.FromContext(ctx).Err(err).Msg("error creating sql update statement") + sentry.CaptureException(err) + return + } - result, err := tx.ExecContext(ctx, pg.Rebind(qry), args...) - if err != nil { - return fmt.Errorf("error updating outbox message: %w", err) - } + result, err := tx.ExecContext(ctx, pg.Rebind(qry), args...) + if err != nil { + logging.FromContext(ctx).Err(err).Msg("error updating outbox message") + sentry.CaptureException(err) + return + } - rows, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("error getting updated outbox message rows: %w", err) - } + rows, err := result.RowsAffected() + if err != nil { + logging.FromContext(ctx).Err(err).Msg("error getting updated outbox message rows") + sentry.CaptureException(err) + return + } - if rows != int64(len(soroIDs)) { - return fmt.Errorf("error updating rows expected %d got %d", len(soroIDs), rows) - } + if rows != int64(len(soroIDs)) { + logging.FromContext(ctx).Error().Msgf("error updating rows expected %d got %d", len(soroIDs), rows) + return + } - err = commit() - if err != nil { - return fmt.Errorf("error committing signing order request outbox: %w", err) - } + if err := commit(); err != nil { + logging.FromContext(ctx).Err(err).Msg("error committing signing order request outbox") + sentry.CaptureException(err) + } + }() return nil } diff --git a/services/skus/datastore_test.go b/services/skus/datastore_test.go index 4793648de..a18e4b7a1 100644 --- a/services/skus/datastore_test.go +++ b/services/skus/datastore_test.go @@ -250,6 +250,8 @@ func (suite *PostgresTestSuite) TestSendSigningRequest_MultipleRow_Success() { // We should only process the max batch size. var messagesItemID []uuid.UUID + // WriteMessages is called in a goroutine; use a channel to wait for it. + writeMessagesDone := make(chan struct{}) signingRequestWriter := NewMockSigningRequestWriter(ctrl) signingRequestWriter.EXPECT(). @@ -258,6 +260,7 @@ func (suite *PostgresTestSuite) TestSendSigningRequest_MultipleRow_Success() { for _, message := range messages { messagesItemID = append(messagesItemID, message.ItemID) } + close(writeMessagesDone) }). Times(1). Return(nil) @@ -265,14 +268,20 @@ func (suite *PostgresTestSuite) TestSendSigningRequest_MultipleRow_Success() { err := suite.storage.SendSigningRequest(ctx, signingRequestWriter) suite.Require().NoError(err) + // Wait for goroutine to call WriteMessages, then for it to commit submitted_at. + select { + case <-writeMessagesDone: + case <-time.After(5 * time.Second): + suite.Fail("timeout waiting for WriteMessages to be called") + return + } + // Assert kafka mock was called with signing requests - suite.Require().NotNil(messagesItemID) + suite.Require().NotEmpty(messagesItemID) - // Assert that all the messages picked up have been marked as processed - - qry, args, err := sqlx.In(`select order_id, submitted_at from signing_order_request_outbox where item_id IN (?)`, - messagesItemID) - suite.Require().NoError(err) + // Assert that all the messages picked up have been marked as processed. + // The goroutine updates submitted_at and commits after WriteMessages returns, + // so poll until the commit is visible. type outboxMessage struct { OrderID uuid.UUID `db:"order_id"` @@ -281,8 +290,24 @@ func (suite *PostgresTestSuite) TestSendSigningRequest_MultipleRow_Success() { var actual []outboxMessage - err = suite.storage.RawDB().SelectContext(ctx, &actual, suite.storage.RawDB().Rebind(qry), args...) - suite.Require().NoError(err) + suite.Require().Eventually(func() bool { + qry, args, qErr := sqlx.In(`select order_id, submitted_at from signing_order_request_outbox where item_id IN (?)`, + messagesItemID) + if qErr != nil { + return false + } + actual = nil + qErr = suite.storage.RawDB().SelectContext(ctx, &actual, suite.storage.RawDB().Rebind(qry), args...) + if qErr != nil || len(actual) == 0 { + return false + } + for _, s := range actual { + if s.SubmittedAt == nil { + return false + } + } + return true + }, 5*time.Second, 10*time.Millisecond, "submitted_at should be set after goroutine commits") for _, s := range actual { suite.Assert().NotNil(s.SubmittedAt)