diff --git a/doc-site/docs/reference/config.md b/doc-site/docs/reference/config.md index be00252d34..71839b04c0 100644 --- a/doc-site/docs/reference/config.md +++ b/doc-site/docs/reference/config.md @@ -832,7 +832,7 @@ title: Configuration Reference |---|-----------|----|-------------| |count|The maximum number of times to retry|`int`|`5` |enabled|Enables retries|`boolean`|`false` -|errorStatusCodeRegex|The regex that the error response status code must match to trigger retry|`string`|`` +|errorStatusCodeRegex|The regex that the error response status code must match to trigger retry|`string`|`^5|^429$` |factor|The retry backoff factor|`float32`|`2` |initWaitTime|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms` |maxWaitTime|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` diff --git a/internal/blockchain/ethereum/config.go b/internal/blockchain/ethereum/config.go index 6ffed65c32..e1057736ff 100644 --- a/internal/blockchain/ethereum/config.go +++ b/internal/blockchain/ethereum/config.go @@ -87,6 +87,7 @@ func (e *Ethereum) InitConfig(config config.Section) { e.ethconnectConf = config.SubSection(EthconnectConfigKey) wsclient.InitConfig(e.ethconnectConf) + e.ethconnectConf.SetDefault(ffresty.HTTPConfigRetryErrorStatusCodeRegex, "^5|^429$") // 5xx and 429s, but not 409s / 404s e.ethconnectConf.AddKnownKey(EthconnectConfigTopic) e.ethconnectConf.AddKnownKey(EthconnectBackgroundStart) e.ethconnectConf.AddKnownKey(EthconnectBackgroundStartInitialDelay, defaultBackgroundInitialDelay) @@ -99,6 +100,7 @@ func (e *Ethereum) InitConfig(config config.Section) { e.ethconnectConf.AddKnownKey(EthconnectConfigInstanceDeprecated) e.ethconnectConf.AddKnownKey(EthconnectConfigFromBlockDeprecated, defaultFromBlock) + // this is unused in reality fftmConf := config.SubSection(FFTMConfigKey) ffresty.InitConfig(fftmConf) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index dce31df1e7..25fb0a8996 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -620,6 +620,12 @@ func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey SetError(&resErr). Post("/") if err != nil || !res.IsSuccess() { + // 409 is a conflict error, so that means the transaction was already submitted / exists + // so we can return a success w/o error + if res != nil && res.StatusCode() == 409 && !resErr.SubmissionRejected { + return false, nil + } + return resErr.SubmissionRejected, common.WrapRESTError(ctx, &resErr, res, err, coremsgs.MsgEthConnectorRESTErr) } return false, nil @@ -761,6 +767,11 @@ func (e *Ethereum) DeployContract(ctx context.Context, nsOpID, signingKey string SetError(&resErr). Post("/") if err != nil || !res.IsSuccess() { + // 409 is a conflict error, so that means the transaction was already submitted / exists + // so we can return a success w/o error + if res != nil && res.StatusCode() == 409 && !resErr.SubmissionRejected { + return false, nil + } if strings.Contains(string(res.Body()), "FFEC100130") { // This error is returned by ethconnect because it does not support deploying contracts with this syntax // Return a more helpful and clear error message diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index f1e39cfe81..02fe576298 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -117,6 +117,17 @@ func resetConf(e *Ethereum) { e.InitConfig(utConfig) } +func newTestRestyClientWithRetry() *resty.Client { + return resty.New().SetBaseURL("http://localhost:12345"). + SetRetryCount(2). + AddRetryCondition(func(r *resty.Response, err error) bool { + return r.StatusCode() > 499 || r.StatusCode() == 429 + }). + SetRetryAfter(func(c *resty.Client, r *resty.Response) (time.Duration, error) { + return 10 * time.Millisecond, nil + }) +} + func newTestEthereum() (*Ethereum, func()) { ctx, cancel := context.WithCancel(context.Background()) mm := &metricsmocks.Manager{} @@ -3222,6 +3233,41 @@ func TestInvokeContractEVMConnectRejectErr(t *testing.T) { assert.True(t, submissionRejected) } +func TestInvokeContract409ConflictNotRejectedOnRetry(t *testing.T) { + e, cancel := newTestEthereum() + e.client = newTestRestyClientWithRetry() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + signingKey := ethHexFormatB32(fftypes.NewRandB32()) + location := &Location{ + Address: "0x12345", + } + method := testFFIMethod() + testErrors := testFFIErrors() + params := map[string]interface{}{ + "x": float64(1), + "y": float64(2), + } + options := map[string]interface{}{} + locationBytes, err := json.Marshal(location) + assert.NoError(t, err) + numCalls := 0 + httpmock.RegisterResponder("POST", `http://localhost:12345/`, + func(req *http.Request) (*http.Response, error) { + defer func() { numCalls++ }() + if numCalls < 1 { + return httpmock.NewJsonResponderOrPanic(500, fftypes.JSONAnyPtr(`{"error":"something went wrong, but connector is successfully processing the transaction"}`))(req) + } + return httpmock.NewJsonResponderOrPanic(409, fftypes.JSONAnyPtr(`{"error":"FF21065: ID 'an-id' is not unique", "submissionRejected": false}`))(req) + }) + parsedMethod, err := e.ParseInterface(context.Background(), method, testErrors) + assert.NoError(t, err) + submissionRejected, err := e.InvokeContract(context.Background(), "an-id", signingKey, fftypes.JSONAnyPtrBytes(locationBytes), parsedMethod, params, options, nil) + assert.NoError(t, err) + assert.False(t, submissionRejected) +} + func TestInvokeContractPrepareFail(t *testing.T) { e, cancel := newTestEthereum() defer cancel() diff --git a/internal/shareddownload/download_manager.go b/internal/shareddownload/download_manager.go index 56cf25877d..48a207bf2a 100644 --- a/internal/shareddownload/download_manager.go +++ b/internal/shareddownload/download_manager.go @@ -20,6 +20,7 @@ import ( "context" "database/sql/driver" "math" + "sync/atomic" "time" "github.com/hyperledger/firefly-common/pkg/config" @@ -71,7 +72,7 @@ type downloadManager struct { type downloadWork struct { dispatchedAt time.Time preparedOp *core.PreparedOperation - attempts int + attempts atomic.Int32 idempotentSubmit bool } @@ -209,19 +210,20 @@ func (dm *downloadManager) recoverDownloads(startupTime *fftypes.FFTime) { func (dm *downloadManager) dispatchWork(work *downloadWork) { dm.work <- work // Log after dispatching so we can see the dispatch delay if the queue got full - log.L(dm.ctx).Debugf("Dispatched download operation %s/%s (attempts=%d) to worker pool", work.preparedOp.Type, work.preparedOp.ID, work.attempts) + log.L(dm.ctx).Debugf("Dispatched download operation %s/%s (attempts=%d) to worker pool", work.preparedOp.Type, work.preparedOp.ID, work.attempts.Load()) } // waitAndRetryDownload is a go routine to wait and re-dispatch a retrying download. // Note this go routine is short lived and completely separate to the workers. func (dm *downloadManager) waitAndRetryDownload(work *downloadWork) { startedWaiting := time.Now() - delay := dm.calcDelay(work.attempts) + attempts := int(work.attempts.Load()) + delay := dm.calcDelay(attempts) <-time.After(delay) delayTimeMS := time.Since(startedWaiting).Milliseconds() totalTimeMS := time.Since(work.dispatchedAt).Milliseconds() log.L(dm.ctx).Infof("Retrying download operation %s/%s after %dms (total=%dms,attempts=%d)", - work.preparedOp.Type, work.preparedOp.ID, delayTimeMS, totalTimeMS, work.attempts) + work.preparedOp.Type, work.preparedOp.ID, delayTimeMS, totalTimeMS, attempts) dm.dispatchWork(work) } diff --git a/internal/shareddownload/download_manager_test.go b/internal/shareddownload/download_manager_test.go index fd217ec9a0..e00b30bba1 100644 --- a/internal/shareddownload/download_manager_test.go +++ b/internal/shareddownload/download_manager_test.go @@ -296,7 +296,11 @@ func TestDownloadManagerStartupRecoveryCombinations(t *testing.T) { assert.Equal(t, core.OpPhaseComplete, phase) called <- true }) - mom.On("SubmitOperationUpdate", mock.Anything).Return(nil) + //mom.On("SubmitOperationUpdate", mock.Anything).Return(nil) + updateCalled := make(chan bool) + mom.On("SubmitOperationUpdate", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + updateCalled <- true + }) mci := dm.callbacks.(*shareddownloadmocks.Callbacks) mci.On("SharedStorageBatchDownloaded", "ref2", []byte("some batch data")).Return(batchID, nil) @@ -307,12 +311,12 @@ func TestDownloadManagerStartupRecoveryCombinations(t *testing.T) { <-called <-called <-dm.recoveryComplete + <-updateCalled mss.AssertExpectations(t) mdi.AssertExpectations(t) mci.AssertExpectations(t) mom.AssertExpectations(t) - } func TestPrepareOperationUnknown(t *testing.T) { diff --git a/internal/shareddownload/download_worker.go b/internal/shareddownload/download_worker.go index 68409aa9fe..37fa74bce3 100644 --- a/internal/shareddownload/download_worker.go +++ b/internal/shareddownload/download_worker.go @@ -56,12 +56,11 @@ func (dw *downloadWorker) downloadWorkerLoop() { } func (dw *downloadWorker) attemptWork(work *downloadWork) { - - work.attempts++ - isLastAttempt := work.attempts >= dw.dm.retryMaxAttempts + attempts := int(work.attempts.Add(1)) + isLastAttempt := attempts >= dw.dm.retryMaxAttempts _, err := dw.dm.operations.RunOperation(dw.ctx, work.preparedOp, work.idempotentSubmit) if err != nil { - log.L(dw.ctx).Errorf("Download operation %s/%s attempt=%d/%d failed: %s", work.preparedOp.Type, work.preparedOp.ID, work.attempts, dw.dm.retryMaxAttempts, err) + log.L(dw.ctx).Errorf("Download operation %s/%s attempt=%d/%d failed: %s", work.preparedOp.Type, work.preparedOp.ID, attempts, dw.dm.retryMaxAttempts, err) if isLastAttempt { dw.dm.operations.SubmitOperationUpdate(&core.OperationUpdateAsync{ OperationUpdate: core.OperationUpdate{ diff --git a/test/e2e/client/restclient.go b/test/e2e/client/restclient.go index cd2fad6d7d..3d77221559 100644 --- a/test/e2e/client/restclient.go +++ b/test/e2e/client/restclient.go @@ -21,6 +21,7 @@ import ( "crypto/rand" "crypto/sha256" "encoding/json" + "errors" "fmt" "io" "math/big" @@ -821,13 +822,13 @@ func (client *FireFlyClient) DeleteContractListener(t *testing.T, id *fftypes.UU require.Equal(t, 204, resp.StatusCode(), "DELETE %s [%d]: %s", path, resp.StatusCode(), resp.String()) } -func (client *FireFlyClient) InvokeContractMethod(t *testing.T, req *core.ContractCallRequest, expectedStatus ...int) (interface{}, error) { - var res interface{} +func (client *FireFlyClient) InvokeContractMethod(t *testing.T, req *core.ContractCallRequest, expectedStatus ...int) (*core.Operation, error) { + res := &core.Operation{} path := client.namespaced(urlContractInvoke) var errResult fftypes.RESTError resp, err := client.Client.R(). SetBody(req). - SetResult(&res). + SetResult(res). SetError(&errResult). Post(path) require.NoError(t, err) @@ -836,7 +837,7 @@ func (client *FireFlyClient) InvokeContractMethod(t *testing.T, req *core.Contra } require.Equal(t, expectedStatus[0], resp.StatusCode(), "POST %s [%d]: %s", path, resp.StatusCode(), resp.String()) if err == nil && errResult.Error != "" { - return res, fmt.Errorf(errResult.Error) + return nil, errors.New(errResult.Error) } return res, err } @@ -1055,6 +1056,16 @@ func (client *FireFlyClient) GetOperations(t *testing.T, startTime time.Time) (o return operations } +func (client *FireFlyClient) GetOperation(t *testing.T, operationID string) (operation *core.Operation) { + path := client.namespaced(urlOperations + "/" + operationID) + resp, err := client.Client.R(). + SetResult(&operation). + Get(path) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode(), "GET %s [%d]: %s", path, resp.StatusCode(), resp.String()) + return operation +} + func (client *FireFlyClient) NetworkAction(t *testing.T, action core.NetworkActionType) { path := client.namespaced(urlNetworkAction) input := &core.NetworkAction{Type: action} diff --git a/test/e2e/gateway/ethereum_coupon.go b/test/e2e/gateway/ethereum_coupon.go index e15fea2612..0a864236fe 100644 --- a/test/e2e/gateway/ethereum_coupon.go +++ b/test/e2e/gateway/ethereum_coupon.go @@ -297,9 +297,9 @@ func (suite *EthereumCouponTestSuite) TestDirectInvokeMethod() { Location: fftypes.JSONAnyPtrBytes(locationBytes), Method: couponFFIGetAllCouponIDs(), } - res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) + queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) assert.NoError(suite.T(), err) - resJSON, err := json.Marshal(res) + resJSON, err := json.Marshal(queryRes) assert.NoError(suite.T(), err) assert.Equal(suite.T(), `{"allCreatedIds":["1"]}`, string(resJSON)) suite.testState.client1.DeleteContractListener(suite.T(), listener.ID) diff --git a/test/e2e/gateway/ethereum_simplestorage.go b/test/e2e/gateway/ethereum_simplestorage.go index f691f4aba4..5877af8233 100644 --- a/test/e2e/gateway/ethereum_simplestorage.go +++ b/test/e2e/gateway/ethereum_simplestorage.go @@ -209,9 +209,9 @@ func (suite *EthereumSimpleStorageTestSuite) TestDirectInvokeMethod() { Location: fftypes.JSONAnyPtrBytes(locationBytes), Method: simpleStorageFFIGet(), } - res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) + queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) assert.NoError(suite.T(), err) - resJSON, err := json.Marshal(res) + resJSON, err := json.Marshal(queryRes) assert.NoError(suite.T(), err) assert.Equal(suite.T(), `{"output":"2"}`, string(resJSON)) suite.testState.client1.DeleteContractListener(suite.T(), listener.ID) @@ -268,9 +268,9 @@ func (suite *EthereumSimpleStorageTestSuite) TestFFIInvokeMethod() { Interface: suite.interfaceID, MethodPath: "get", } - res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) + queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) assert.NoError(suite.T(), err) - resJSON, err := json.Marshal(res) + resJSON, err := json.Marshal(queryRes) assert.NoError(suite.T(), err) assert.Equal(suite.T(), `{"output":"42"}`, string(resJSON)) suite.testState.client1.DeleteContractListener(suite.T(), listener.ID) diff --git a/test/e2e/gateway/fabric_contracts.go b/test/e2e/gateway/fabric_contracts.go index 7a216e2b16..28b3da0c9a 100644 --- a/test/e2e/gateway/fabric_contracts.go +++ b/test/e2e/gateway/fabric_contracts.go @@ -159,10 +159,10 @@ func (suite *FabricContractTestSuite) TestE2EContractEvents() { }, } - res, err = suite.testState.client1.QueryContractMethod(suite.testState.t, queryContractRequest) + queryRes, err := suite.testState.client1.QueryContractMethod(suite.testState.t, queryContractRequest) suite.T().Log(res) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), assetName, res.(map[string]interface{})["name"]) + assert.Equal(suite.T(), assetName, queryRes.(map[string]interface{})["name"]) suite.testState.client1.DeleteContractListener(suite.T(), subs[0].ID) subs = suite.testState.client1.GetContractListeners(suite.T(), suite.testState.startTime) diff --git a/test/e2e/multiparty/ethereum_contracts.go b/test/e2e/multiparty/ethereum_contracts.go index 06a605395d..1fa80fbbc8 100644 --- a/test/e2e/multiparty/ethereum_contracts.go +++ b/test/e2e/multiparty/ethereum_contracts.go @@ -23,6 +23,7 @@ import ( "os" "os/exec" "testing" + "time" "github.com/aidarkhanov/nanoid" "github.com/go-resty/resty/v2" @@ -189,9 +190,9 @@ func (suite *EthereumContractTestSuite) TestDirectInvokeMethod() { Location: fftypes.JSONAnyPtrBytes(locationBytes), Method: simpleStorageFFIGet(), } - res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) + queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) assert.NoError(suite.T(), err) - resJSON, err := json.Marshal(res) + resJSON, err := json.Marshal(queryRes) assert.NoError(suite.T(), err) assert.Equal(suite.T(), `{"output":"2"}`, string(resJSON)) suite.testState.client1.DeleteContractListener(suite.T(), listener.ID) @@ -231,6 +232,19 @@ func (suite *EthereumContractTestSuite) TestFFIInvokeMethod() { assert.NoError(suite.T(), err) assert.NotNil(suite.T(), res) + // Wait for the operation to succeed before idempotency check + // If the operation never succeeds, the larger test timeout will fail + for { + suite.T().Logf("Waiting for invoke operation to succeed: %s", res.ID.String()) + op := suite.testState.client1.GetOperation(suite.T(), res.ID.String()) + if op.Status == core.OpStatusSucceeded { + suite.T().Logf("Invoke operation succeeded: %s", res.ID.String()) + break + } + time.Sleep(100 * time.Millisecond) + suite.T().Logf("Retrying, invoke operation status: %s", op.Status) + } + // Idempotency check _, err = suite.testState.client1.InvokeContractMethod(suite.T(), invokeContractRequest, 409) assert.Regexp(suite.T(), "FF10431|FF10458" /* idempotency check could come from FF or blockchain connector, depending on the operation update that is async */, err) @@ -253,9 +267,9 @@ func (suite *EthereumContractTestSuite) TestFFIInvokeMethod() { Interface: suite.interfaceID, MethodPath: "get", } - res, err = suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) + queryRes, err := suite.testState.client1.QueryContractMethod(suite.T(), queryContractRequest) assert.NoError(suite.T(), err) - resJSON, err := json.Marshal(res) + resJSON, err := json.Marshal(queryRes) assert.NoError(suite.T(), err) assert.Equal(suite.T(), `{"output":"42"}`, string(resJSON)) suite.testState.client1.DeleteContractListener(suite.T(), listener.ID) diff --git a/test/e2e/multiparty/fabric_contracts.go b/test/e2e/multiparty/fabric_contracts.go index 8be15a754e..4b80c0a5de 100644 --- a/test/e2e/multiparty/fabric_contracts.go +++ b/test/e2e/multiparty/fabric_contracts.go @@ -158,10 +158,10 @@ func (suite *FabricContractTestSuite) TestE2EContractEvents() { }, } - res, err = suite.testState.client1.QueryContractMethod(suite.testState.t, queryContractRequest) + queryRes, err := suite.testState.client1.QueryContractMethod(suite.testState.t, queryContractRequest) suite.T().Log(res) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), assetName, res.(map[string]interface{})["name"]) + assert.Equal(suite.T(), assetName, queryRes.(map[string]interface{})["name"]) suite.testState.client1.DeleteContractListener(suite.T(), subs[0].ID) subs = suite.testState.client1.GetContractListeners(suite.T(), suite.testState.startTime)