Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc-site/docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ title: Configuration Reference
|---|-----------|----|-------------|
|count|The maximum number of times to retry|`int`|`5`
|enabled|Enables retries|`boolean`|`false`
|errorStatusCodeRegex|The regex that the error response status code must match to trigger retry|`string`|`<nil>`
|errorStatusCodeRegex|The regex that the error response status code must match to trigger retry|`string`|`^5|^429$`
|factor|The retry backoff factor|`float32`|`2`
|initWaitTime|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|maxWaitTime|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
Expand Down
2 changes: 2 additions & 0 deletions internal/blockchain/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (e *Ethereum) InitConfig(config config.Section) {
e.ethconnectConf = config.SubSection(EthconnectConfigKey)
wsclient.InitConfig(e.ethconnectConf)

e.ethconnectConf.SetDefault(ffresty.HTTPConfigRetryErrorStatusCodeRegex, "^5|^429$") // 5xx and 429s, but not 409s / 404s
e.ethconnectConf.AddKnownKey(EthconnectConfigTopic)
e.ethconnectConf.AddKnownKey(EthconnectBackgroundStart)
e.ethconnectConf.AddKnownKey(EthconnectBackgroundStartInitialDelay, defaultBackgroundInitialDelay)
Expand All @@ -99,6 +100,7 @@ func (e *Ethereum) InitConfig(config config.Section) {
e.ethconnectConf.AddKnownKey(EthconnectConfigInstanceDeprecated)
e.ethconnectConf.AddKnownKey(EthconnectConfigFromBlockDeprecated, defaultFromBlock)

// this is unused in reality
fftmConf := config.SubSection(FFTMConfigKey)
ffresty.InitConfig(fftmConf)

Expand Down
11 changes: 11 additions & 0 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,12 @@ func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey
SetError(&resErr).
Post("/")
if err != nil || !res.IsSuccess() {
// 409 is a conflict error, so that means the transaction was already submitted / exists
// so we can return a success w/o error
if res != nil && res.StatusCode() == 409 && !resErr.SubmissionRejected {
return false, nil
}

return resErr.SubmissionRejected, common.WrapRESTError(ctx, &resErr, res, err, coremsgs.MsgEthConnectorRESTErr)
}
return false, nil
Expand Down Expand Up @@ -761,6 +767,11 @@ func (e *Ethereum) DeployContract(ctx context.Context, nsOpID, signingKey string
SetError(&resErr).
Post("/")
if err != nil || !res.IsSuccess() {
// 409 is a conflict error, so that means the transaction was already submitted / exists
// so we can return a success w/o error
if res != nil && res.StatusCode() == 409 && !resErr.SubmissionRejected {
return false, nil
}
if strings.Contains(string(res.Body()), "FFEC100130") {
// This error is returned by ethconnect because it does not support deploying contracts with this syntax
// Return a more helpful and clear error message
Expand Down
46 changes: 46 additions & 0 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ func resetConf(e *Ethereum) {
e.InitConfig(utConfig)
}

func newTestRestyClientWithRetry() *resty.Client {
return resty.New().SetBaseURL("http://localhost:12345").
SetRetryCount(2).
AddRetryCondition(func(r *resty.Response, err error) bool {
return r.StatusCode() > 499 || r.StatusCode() == 429
}).
SetRetryAfter(func(c *resty.Client, r *resty.Response) (time.Duration, error) {
return 10 * time.Millisecond, nil
})
}

func newTestEthereum() (*Ethereum, func()) {
ctx, cancel := context.WithCancel(context.Background())
mm := &metricsmocks.Manager{}
Expand Down Expand Up @@ -3222,6 +3233,41 @@ func TestInvokeContractEVMConnectRejectErr(t *testing.T) {
assert.True(t, submissionRejected)
}

func TestInvokeContract409ConflictNotRejectedOnRetry(t *testing.T) {
e, cancel := newTestEthereum()
e.client = newTestRestyClientWithRetry()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()
signingKey := ethHexFormatB32(fftypes.NewRandB32())
location := &Location{
Address: "0x12345",
}
method := testFFIMethod()
testErrors := testFFIErrors()
params := map[string]interface{}{
"x": float64(1),
"y": float64(2),
}
options := map[string]interface{}{}
locationBytes, err := json.Marshal(location)
assert.NoError(t, err)
numCalls := 0
httpmock.RegisterResponder("POST", `http://localhost:12345/`,
func(req *http.Request) (*http.Response, error) {
defer func() { numCalls++ }()
if numCalls < 1 {
return httpmock.NewJsonResponderOrPanic(500, fftypes.JSONAnyPtr(`{"error":"something went wrong, but connector is successfully processing the transaction"}`))(req)
}
return httpmock.NewJsonResponderOrPanic(409, fftypes.JSONAnyPtr(`{"error":"FF21065: ID 'an-id' is not unique", "submissionRejected": false}`))(req)
})
parsedMethod, err := e.ParseInterface(context.Background(), method, testErrors)
assert.NoError(t, err)
submissionRejected, err := e.InvokeContract(context.Background(), "an-id", signingKey, fftypes.JSONAnyPtrBytes(locationBytes), parsedMethod, params, options, nil)
assert.NoError(t, err)
assert.False(t, submissionRejected)
}

func TestInvokeContractPrepareFail(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
Expand Down
10 changes: 6 additions & 4 deletions internal/shareddownload/download_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"database/sql/driver"
"math"
"sync/atomic"
"time"

"github.com/hyperledger/firefly-common/pkg/config"
Expand Down Expand Up @@ -71,7 +72,7 @@ type downloadManager struct {
type downloadWork struct {
dispatchedAt time.Time
preparedOp *core.PreparedOperation
attempts int
attempts atomic.Int32
idempotentSubmit bool
}

Expand Down Expand Up @@ -209,19 +210,20 @@ func (dm *downloadManager) recoverDownloads(startupTime *fftypes.FFTime) {
func (dm *downloadManager) dispatchWork(work *downloadWork) {
dm.work <- work
// Log after dispatching so we can see the dispatch delay if the queue got full
log.L(dm.ctx).Debugf("Dispatched download operation %s/%s (attempts=%d) to worker pool", work.preparedOp.Type, work.preparedOp.ID, work.attempts)
log.L(dm.ctx).Debugf("Dispatched download operation %s/%s (attempts=%d) to worker pool", work.preparedOp.Type, work.preparedOp.ID, work.attempts.Load())
}

// waitAndRetryDownload is a go routine to wait and re-dispatch a retrying download.
// Note this go routine is short lived and completely separate to the workers.
func (dm *downloadManager) waitAndRetryDownload(work *downloadWork) {
startedWaiting := time.Now()
delay := dm.calcDelay(work.attempts)
attempts := int(work.attempts.Load())
delay := dm.calcDelay(attempts)
<-time.After(delay)
delayTimeMS := time.Since(startedWaiting).Milliseconds()
totalTimeMS := time.Since(work.dispatchedAt).Milliseconds()
log.L(dm.ctx).Infof("Retrying download operation %s/%s after %dms (total=%dms,attempts=%d)",
work.preparedOp.Type, work.preparedOp.ID, delayTimeMS, totalTimeMS, work.attempts)
work.preparedOp.Type, work.preparedOp.ID, delayTimeMS, totalTimeMS, attempts)
dm.dispatchWork(work)
}

Expand Down
8 changes: 6 additions & 2 deletions internal/shareddownload/download_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,11 @@ func TestDownloadManagerStartupRecoveryCombinations(t *testing.T) {
assert.Equal(t, core.OpPhaseComplete, phase)
called <- true
})
mom.On("SubmitOperationUpdate", mock.Anything).Return(nil)
//mom.On("SubmitOperationUpdate", mock.Anything).Return(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line?

updateCalled := make(chan bool)
mom.On("SubmitOperationUpdate", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
updateCalled <- true
})

mci := dm.callbacks.(*shareddownloadmocks.Callbacks)
mci.On("SharedStorageBatchDownloaded", "ref2", []byte("some batch data")).Return(batchID, nil)
Expand All @@ -307,12 +311,12 @@ func TestDownloadManagerStartupRecoveryCombinations(t *testing.T) {
<-called
<-called
<-dm.recoveryComplete
<-updateCalled

mss.AssertExpectations(t)
mdi.AssertExpectations(t)
mci.AssertExpectations(t)
mom.AssertExpectations(t)

}

func TestPrepareOperationUnknown(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions internal/shareddownload/download_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ func (dw *downloadWorker) downloadWorkerLoop() {
}

func (dw *downloadWorker) attemptWork(work *downloadWork) {

work.attempts++
isLastAttempt := work.attempts >= dw.dm.retryMaxAttempts
attempts := int(work.attempts.Add(1))
isLastAttempt := attempts >= dw.dm.retryMaxAttempts
_, err := dw.dm.operations.RunOperation(dw.ctx, work.preparedOp, work.idempotentSubmit)
if err != nil {
log.L(dw.ctx).Errorf("Download operation %s/%s attempt=%d/%d failed: %s", work.preparedOp.Type, work.preparedOp.ID, work.attempts, dw.dm.retryMaxAttempts, err)
log.L(dw.ctx).Errorf("Download operation %s/%s attempt=%d/%d failed: %s", work.preparedOp.Type, work.preparedOp.ID, attempts, dw.dm.retryMaxAttempts, err)
if isLastAttempt {
dw.dm.operations.SubmitOperationUpdate(&core.OperationUpdateAsync{
OperationUpdate: core.OperationUpdate{
Expand Down
19 changes: 15 additions & 4 deletions test/e2e/client/restclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/rand"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
Expand Down Expand Up @@ -821,13 +822,13 @@ func (client *FireFlyClient) DeleteContractListener(t *testing.T, id *fftypes.UU
require.Equal(t, 204, resp.StatusCode(), "DELETE %s [%d]: %s", path, resp.StatusCode(), resp.String())
}

func (client *FireFlyClient) InvokeContractMethod(t *testing.T, req *core.ContractCallRequest, expectedStatus ...int) (interface{}, error) {
var res interface{}
func (client *FireFlyClient) InvokeContractMethod(t *testing.T, req *core.ContractCallRequest, expectedStatus ...int) (*core.Operation, error) {
res := &core.Operation{}
path := client.namespaced(urlContractInvoke)
var errResult fftypes.RESTError
resp, err := client.Client.R().
SetBody(req).
SetResult(&res).
SetResult(res).
SetError(&errResult).
Post(path)
require.NoError(t, err)
Expand All @@ -836,7 +837,7 @@ func (client *FireFlyClient) InvokeContractMethod(t *testing.T, req *core.Contra
}
require.Equal(t, expectedStatus[0], resp.StatusCode(), "POST %s [%d]: %s", path, resp.StatusCode(), resp.String())
if err == nil && errResult.Error != "" {
return res, fmt.Errorf(errResult.Error)
return nil, errors.New(errResult.Error)
}
return res, err
}
Expand Down Expand Up @@ -1055,6 +1056,16 @@ func (client *FireFlyClient) GetOperations(t *testing.T, startTime time.Time) (o
return operations
}

func (client *FireFlyClient) GetOperation(t *testing.T, operationID string) (operation *core.Operation) {
path := client.namespaced(urlOperations + "/" + operationID)
resp, err := client.Client.R().
SetResult(&operation).
Get(path)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode(), "GET %s [%d]: %s", path, resp.StatusCode(), resp.String())
return operation
}

func (client *FireFlyClient) NetworkAction(t *testing.T, action core.NetworkActionType) {
path := client.namespaced(urlNetworkAction)
input := &core.NetworkAction{Type: action}
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/gateway/ethereum_coupon.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ func (suite *EthereumCouponTestSuite) TestDirectInvokeMethod() {
Location: fftypes.JSONAnyPtrBytes(locationBytes),
Method: couponFFIGetAllCouponIDs(),
}
res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
assert.NoError(suite.T(), err)
resJSON, err := json.Marshal(res)
resJSON, err := json.Marshal(queryRes)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), `{"allCreatedIds":["1"]}`, string(resJSON))
suite.testState.client1.DeleteContractListener(suite.T(), listener.ID)
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/gateway/ethereum_simplestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ func (suite *EthereumSimpleStorageTestSuite) TestDirectInvokeMethod() {
Location: fftypes.JSONAnyPtrBytes(locationBytes),
Method: simpleStorageFFIGet(),
}
res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
assert.NoError(suite.T(), err)
resJSON, err := json.Marshal(res)
resJSON, err := json.Marshal(queryRes)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), `{"output":"2"}`, string(resJSON))
suite.testState.client1.DeleteContractListener(suite.T(), listener.ID)
Expand Down Expand Up @@ -268,9 +268,9 @@ func (suite *EthereumSimpleStorageTestSuite) TestFFIInvokeMethod() {
Interface: suite.interfaceID,
MethodPath: "get",
}
res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
assert.NoError(suite.T(), err)
resJSON, err := json.Marshal(res)
resJSON, err := json.Marshal(queryRes)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), `{"output":"42"}`, string(resJSON))
suite.testState.client1.DeleteContractListener(suite.T(), listener.ID)
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/gateway/fabric_contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ func (suite *FabricContractTestSuite) TestE2EContractEvents() {
},
}

res, err = suite.testState.client1.QueryContractMethod(suite.testState.t, queryContractRequest)
queryRes, err := suite.testState.client1.QueryContractMethod(suite.testState.t, queryContractRequest)
suite.T().Log(res)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), assetName, res.(map[string]interface{})["name"])
assert.Equal(suite.T(), assetName, queryRes.(map[string]interface{})["name"])

suite.testState.client1.DeleteContractListener(suite.T(), subs[0].ID)
subs = suite.testState.client1.GetContractListeners(suite.T(), suite.testState.startTime)
Expand Down
22 changes: 18 additions & 4 deletions test/e2e/multiparty/ethereum_contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"os/exec"
"testing"
"time"

"github.com/aidarkhanov/nanoid"
"github.com/go-resty/resty/v2"
Expand Down Expand Up @@ -189,9 +190,9 @@ func (suite *EthereumContractTestSuite) TestDirectInvokeMethod() {
Location: fftypes.JSONAnyPtrBytes(locationBytes),
Method: simpleStorageFFIGet(),
}
res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
assert.NoError(suite.T(), err)
resJSON, err := json.Marshal(res)
resJSON, err := json.Marshal(queryRes)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), `{"output":"2"}`, string(resJSON))
suite.testState.client1.DeleteContractListener(suite.T(), listener.ID)
Expand Down Expand Up @@ -231,6 +232,19 @@ func (suite *EthereumContractTestSuite) TestFFIInvokeMethod() {
assert.NoError(suite.T(), err)
assert.NotNil(suite.T(), res)

// Wait for the operation to succeed before idempotency check
// If the operation never succeeds, the larger test timeout will fail
for {
suite.T().Logf("Waiting for invoke operation to succeed: %s", res.ID.String())
op := suite.testState.client1.GetOperation(suite.T(), res.ID.String())
if op.Status == core.OpStatusSucceeded {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are stuck here if it fails, until the timeout? Should we just check for failed status and make potential move this to a common test utils, will leave it up to you on this one

suite.T().Logf("Invoke operation succeeded: %s", res.ID.String())
break
}
time.Sleep(100 * time.Millisecond)
suite.T().Logf("Retrying, invoke operation status: %s", op.Status)
}

// Idempotency check
_, err = suite.testState.client1.InvokeContractMethod(suite.T(), invokeContractRequest, 409)
assert.Regexp(suite.T(), "FF10431|FF10458" /* idempotency check could come from FF or blockchain connector, depending on the operation update that is async */, err)
Expand All @@ -253,9 +267,9 @@ func (suite *EthereumContractTestSuite) TestFFIInvokeMethod() {
Interface: suite.interfaceID,
MethodPath: "get",
}
res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest)
assert.NoError(suite.T(), err)
resJSON, err := json.Marshal(res)
resJSON, err := json.Marshal(queryRes)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), `{"output":"42"}`, string(resJSON))
suite.testState.client1.DeleteContractListener(suite.T(), listener.ID)
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/multiparty/fabric_contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ func (suite *FabricContractTestSuite) TestE2EContractEvents() {
},
}

res, err = suite.testState.client1.QueryContractMethod(suite.testState.t, queryContractRequest)
queryRes, err := suite.testState.client1.QueryContractMethod(suite.testState.t, queryContractRequest)
suite.T().Log(res)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), assetName, res.(map[string]interface{})["name"])
assert.Equal(suite.T(), assetName, queryRes.(map[string]interface{})["name"])

suite.testState.client1.DeleteContractListener(suite.T(), subs[0].ID)
subs = suite.testState.client1.GetContractListeners(suite.T(), suite.testState.startTime)
Expand Down
Loading