Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 67 additions & 48 deletions services/skus/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func (pg *Postgres) CreateKey(merchant string, name string, encryptedSecretKey s
RETURNING id, name, merchant_id, encrypted_secret_key, nonce, created_at, expiry
`,
merchant, name, encryptedSecretKey, nonce)

if err != nil {
return nil, fmt.Errorf("failed to create key for merchant: %w", err)
}
Expand Down Expand Up @@ -231,7 +230,6 @@ func (pg *Postgres) GetKeysByMerchant(merchant string, showExpired bool) (*[]Key
where
merchant_id = $1`+expiredQuery+" ORDER BY name, created_at",
merchant)

if err != nil {
return nil, fmt.Errorf("failed to get keys for merchant: %w", err)
}
Expand All @@ -255,7 +253,6 @@ func (pg *Postgres) GetKey(id uuid.UUID, showExpired bool) (*Key, error) {
where
id = $1`+expiredQuery,
id.String())

if err != nil {
return nil, fmt.Errorf("failed to get key: %w", err)
}
Expand Down Expand Up @@ -374,7 +371,8 @@ func (pg *Postgres) GetOrderItem(ctx context.Context, itemID uuid.UUID) (*OrderI

// GetPagedMerchantTransactions - get a paginated list of transactions for a merchant
func (pg *Postgres) GetPagedMerchantTransactions(
ctx context.Context, merchantID uuid.UUID, pagination *inputs.Pagination) (*[]Transaction, int, error) {
ctx context.Context, merchantID uuid.UUID, pagination *inputs.Pagination,
) (*[]Transaction, int, error) {
var (
total int
err error
Expand Down Expand Up @@ -426,7 +424,7 @@ func (pg *Postgres) GetPagedMerchantTransactions(
return nil, 0, err
}
for rows.Next() {
var transaction = new(Transaction)
transaction := new(Transaction)
err := rows.StructScan(transaction)
if err != nil {
return nil, 0, err
Expand All @@ -448,7 +446,6 @@ func (pg *Postgres) GetTransactions(orderID uuid.UUID) (*[]Transaction, error) {
FROM transactions WHERE order_id = $1`
transactions := []Transaction{}
err := pg.RawDB().Select(&transactions, statement, orderID)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -558,13 +555,11 @@ func (pg *Postgres) CreateTransaction(orderID uuid.UUID, externalTransactionID s
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, order_id, created_at, updated_at, external_transaction_id, status, currency, kind, amount
`, orderID, externalTransactionID, status, currency, kind, amount)

if err != nil {
return nil, err
}

err = tx.Commit()

if err != nil {
return nil, err
}
Expand All @@ -584,13 +579,11 @@ func (pg *Postgres) UpdateTransaction(orderID uuid.UUID, externalTransactionID s
where external_transaction_id=$5 and order_id=$6
RETURNING id, order_id, created_at, updated_at, external_transaction_id, status, currency, kind, amount
`, status, currency, kind, amount, externalTransactionID, orderID)

if err != nil {
return nil, err
}

err = tx.Commit()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -822,7 +815,7 @@ FOR UPDATE
}

for rows.Next() {
var vr = new(VoteRecord)
vr := new(VoteRecord)
if err := rows.Scan(&vr.ID, &vr.RequestCredentials, &vr.VoteText,
&vr.VoteEventBinary, &vr.Erred, &vr.Processed); err != nil {
return tx, nil, fmt.Errorf("failed to scan vote drain record: %w", err)
Expand All @@ -847,9 +840,8 @@ func (pg *Postgres) MarkVoteErrored(ctx context.Context, vr VoteRecord, tx *sqlx
logger := logging.Logger(ctx, "skus.MarkVoteErrored")
logger.Debug().Msg("about to set errored to true for this vote")

var statement = `update vote_drain set erred=true where id=$1`
statement := `update vote_drain set erred=true where id=$1`
_, err := tx.ExecContext(ctx, statement, vr.ID)

if err != nil {
logger.Error().Err(err).Msg("failed to update vote_drain")
return fmt.Errorf("failed to commit vote from drain: %w", err)
Expand All @@ -863,9 +855,8 @@ func (pg *Postgres) CommitVote(ctx context.Context, vr VoteRecord, tx *sqlx.Tx)
logger := logging.Logger(ctx, "skus.CommitVote")
logger.Debug().Msg("about to set processed to true for this vote")

var statement = `update vote_drain set processed=true where id=$1`
statement := `update vote_drain set processed=true where id=$1`
_, err := tx.ExecContext(ctx, statement, vr.ID)

if err != nil {
logger.Error().Err(err).Msg("unable to update processed=true for vote drain job")
return fmt.Errorf("failed to commit vote from drain: %w", err)
Expand Down Expand Up @@ -1118,12 +1109,28 @@ func (pg *Postgres) SendSigningRequest(ctx context.Context, signingRequestWriter
if err != nil {
return fmt.Errorf("error send signing request could not begin tx: %w", err)
}
defer rollback()

// txTransferred prevents the deferred rollback in the main function from firing
// when SendSigningRequest returns after launching the goroutine. Without the guard,
// the defer would roll back the transaction while the goroutine is still using it.
// Once txTransferred is true, the goroutine owns the transaction and its own
// defer rollback() is the only one that will fire.
txTransferred := false
defer func() {
if !txTransferred {
rollback()
}
}()

var soro []SigningOrderRequestOutbox
err = tx.SelectContext(ctx, &soro, `select request_id, order_id, item_id, message_data from signing_order_request_outbox
where submitted_at is null order by created_at asc
for update skip locked limit $1`, signingRequestBatchSize)
err = tx.SelectContext(
ctx,
&soro,
`select request_id, order_id, item_id, message_data from signing_order_request_outbox
where submitted_at is null order by created_at asc
for update skip locked limit $1`,
signingRequestBatchSize,
)
if err != nil {
return fmt.Errorf("error could not get signing order request outbox: %w", err)
}
Expand All @@ -1132,10 +1139,19 @@ func (pg *Postgres) SendSigningRequest(ctx context.Context, signingRequestWriter
return nil
}

// If there is an error writing messages to kafka we need to log the failed messages here instead of returning
// to the job runner, we can then update the messages as processed and continue to the next batch rather than
// retrying, these errors are likely not transient and need checked before retry.
soroIDs := make([]uuid.UUID, len(soro))
for i := 0; i < len(soroIDs); i++ {
soroIDs[i] = soro[i].RequestID
}

txTransferred = true
go func() {
defer rollback()

// If there is a total failure writing to kafka, return without committing so the rows
// remain unsubmitted and are retried on the next cycle. Per-message WriteErrors are
// logged and the rows are still marked submitted to avoid duplicate sends for those
// that succeeded.
switch err := signingRequestWriter.WriteMessages(ctx, soro).(type) {
case nil:
case kafka.WriteErrors:
Expand All @@ -1152,38 +1168,41 @@ func (pg *Postgres) SendSigningRequest(ctx context.Context, signingRequestWriter
Interface("messages", soro).
Msg("error writing outbox messages")
sentry.CaptureException(err)
return
}
}()

soroIDs := make([]uuid.UUID, len(soro))
for i := 0; i < len(soroIDs); i++ {
soroIDs[i] = soro[i].RequestID
}

qry, args, err := sqlx.In(`update signing_order_request_outbox
set submitted_at = now() where request_id IN (?)`, soroIDs)
if err != nil {
return fmt.Errorf("error creating sql update statement: %w", err)
}
qry, args, err := sqlx.In(`update signing_order_request_outbox
set submitted_at = now() where request_id IN (?)`, soroIDs)
if err != nil {
logging.FromContext(ctx).Err(err).Msg("error creating sql update statement")
sentry.CaptureException(err)
return
}

result, err := tx.ExecContext(ctx, pg.Rebind(qry), args...)
if err != nil {
return fmt.Errorf("error updating outbox message: %w", err)
}
result, err := tx.ExecContext(ctx, pg.Rebind(qry), args...)
if err != nil {
logging.FromContext(ctx).Err(err).Msg("error updating outbox message")
sentry.CaptureException(err)
return
}

rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("error getting updated outbox message rows: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
logging.FromContext(ctx).Err(err).Msg("error getting updated outbox message rows")
sentry.CaptureException(err)
return
}

if rows != int64(len(soroIDs)) {
return fmt.Errorf("error updating rows expected %d got %d", len(soroIDs), rows)
}
if rows != int64(len(soroIDs)) {
logging.FromContext(ctx).Error().Msgf("error updating rows expected %d got %d", len(soroIDs), rows)
return
}

err = commit()
if err != nil {
return fmt.Errorf("error committing signing order request outbox: %w", err)
}
if err := commit(); err != nil {
logging.FromContext(ctx).Err(err).Msg("error committing signing order request outbox")
sentry.CaptureException(err)
}
}()

return nil
}
Expand Down
41 changes: 33 additions & 8 deletions services/skus/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func (suite *PostgresTestSuite) TestSendSigningRequest_MultipleRow_Success() {
// We should only process the max batch size.

var messagesItemID []uuid.UUID
// WriteMessages is called in a goroutine; use a channel to wait for it.
writeMessagesDone := make(chan struct{})

signingRequestWriter := NewMockSigningRequestWriter(ctrl)
signingRequestWriter.EXPECT().
Expand All @@ -258,21 +260,28 @@ func (suite *PostgresTestSuite) TestSendSigningRequest_MultipleRow_Success() {
for _, message := range messages {
messagesItemID = append(messagesItemID, message.ItemID)
}
close(writeMessagesDone)
}).
Times(1).
Return(nil)

err := suite.storage.SendSigningRequest(ctx, signingRequestWriter)
suite.Require().NoError(err)

// Wait for goroutine to call WriteMessages, then for it to commit submitted_at.
select {
case <-writeMessagesDone:
case <-time.After(5 * time.Second):
suite.Fail("timeout waiting for WriteMessages to be called")
return
}

// Assert kafka mock was called with signing requests
suite.Require().NotNil(messagesItemID)
suite.Require().NotEmpty(messagesItemID)

// Assert that all the messages picked up have been marked as processed

qry, args, err := sqlx.In(`select order_id, submitted_at from signing_order_request_outbox where item_id IN (?)`,
messagesItemID)
suite.Require().NoError(err)
// Assert that all the messages picked up have been marked as processed.
// The goroutine updates submitted_at and commits after WriteMessages returns,
// so poll until the commit is visible.

type outboxMessage struct {
OrderID uuid.UUID `db:"order_id"`
Expand All @@ -281,8 +290,24 @@ func (suite *PostgresTestSuite) TestSendSigningRequest_MultipleRow_Success() {

var actual []outboxMessage

err = suite.storage.RawDB().SelectContext(ctx, &actual, suite.storage.RawDB().Rebind(qry), args...)
suite.Require().NoError(err)
suite.Require().Eventually(func() bool {
qry, args, qErr := sqlx.In(`select order_id, submitted_at from signing_order_request_outbox where item_id IN (?)`,
messagesItemID)
if qErr != nil {
return false
}
actual = nil
qErr = suite.storage.RawDB().SelectContext(ctx, &actual, suite.storage.RawDB().Rebind(qry), args...)
if qErr != nil || len(actual) == 0 {
return false
}
for _, s := range actual {
if s.SubmittedAt == nil {
return false
}
}
return true
}, 5*time.Second, 10*time.Millisecond, "submitted_at should be set after goroutine commits")

for _, s := range actual {
suite.Assert().NotNil(s.SubmittedAt)
Expand Down
Loading