diff --git a/pkg/common/helper.go b/pkg/common/helper.go index 25ffa9b..4537f20 100644 --- a/pkg/common/helper.go +++ b/pkg/common/helper.go @@ -23,6 +23,7 @@ package common import ( "context" "errors" + "net/http" "reflect" "runtime" "strings" @@ -30,6 +31,7 @@ import ( nuclioerrors "github.com/nuclio/errors" "github.com/nuclio/logger" + v3ioerrors "github.com/v3io/v3io-go/pkg/errors" ) func getFunctionName(fn interface{}) string { @@ -184,24 +186,56 @@ func StringSlicesEqual(slice1 []string, slice2 []string) bool { return true } +// EngineErrorIsNonFatal checks whether the error should be considered non-fatal +// It unwraps the error chain and checks each error against predefined non-fatal patterns func EngineErrorIsNonFatal(err error) bool { - var nonFatalEngineErrorsPartialMatch = []string{ - "dialing to the given TCP address timed out", - "timeout", - "refused", + nonFatalErrorCheckFunctions := []func(error) bool{ + isNonFatalErrorString, + isNonFatalStatusCode, } - return errorMatches(err, nonFatalEngineErrorsPartialMatch) + return errorMatches(err, nonFatalErrorCheckFunctions) } -func errorMatches(err error, substrings []string) bool { +// errorMatches unwraps the error chain and applies each check function to every error in the chain +// Returns true if any check function returns true for any error in the chain +func errorMatches(err error, checkFunctions []func(error) bool) bool { // Unwraps the entire error chain for e := err; e != nil; e = errors.Unwrap(e) { - errMsg := e.Error() - for _, substring := range substrings { - if strings.Contains(errMsg, substring) { + // Execute each check function on the current error + for _, checkFunc := range checkFunctions { + if checkFunc(e) { return true } } } return false } + +// isNonFatalErrorString checks whether the error message contains any of the predefined non-fatal substrings +func isNonFatalErrorString(err error) bool { + var nonFatalEngineErrorsPartialMatch = []string{ + "dialing to the given TCP address timed out", + "timeout", + "refused", + } + errMsg := err.Error() + for _, substring := range nonFatalEngineErrorsPartialMatch { + if strings.Contains(errMsg, substring) { + return true + } + } + return false +} + +// isNonFatalStatusCode checks whether the error contains any of the predefined non-fatal HTTP status codes +func isNonFatalStatusCode(err error) bool { + var nonFatalStatusCodes = []int{ + http.StatusServiceUnavailable, + } + errWithStatusCode, ok := err.(v3ioerrors.ErrorWithStatusCode) + if !ok { + return false + } + statusCode := errWithStatusCode.StatusCode() + return IntSliceContainsInt(nonFatalStatusCodes, statusCode) +} diff --git a/pkg/common/helper_test.go b/pkg/common/helper_test.go new file mode 100644 index 0000000..4258b39 --- /dev/null +++ b/pkg/common/helper_test.go @@ -0,0 +1,202 @@ +/* +Copyright 2025 Iguazio Systems Ltd. + +Licensed under the Apache License, Version 2.0 (the "License") with +an addition restriction as set forth herein. You may not use this +file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. + +In addition, you may not use the software for any purposes that are +illegal under applicable law, and the grant of the foregoing license +under the Apache 2.0 license is conditioned upon your compliance with +such restriction. +*/ + +package common + +import ( + "fmt" + "testing" + + "github.com/nuclio/errors" + v3ioerrors "github.com/v3io/v3io-go/pkg/errors" + + "github.com/stretchr/testify/suite" +) + +type helperTestSuite struct { + suite.Suite +} + +func (suite *helperTestSuite) TestEngineErrorIsNonFatalNestedErrorFromLog() { + // Create the nested error structure with http 503 Service Temporarily Unavailable as the root cause + + // Create HTTP response string contains the 503 error + httpResponseStr := `HTTP/1.1 503 Service Temporarily Unavailable\r\nServer: nginx\r\nDate: Tue, 03 Jun 2025 07:37:09 GMT\r\nContent-Type: application/json\r\nContent-Length: 89\r\nConnection: keep-alive\r\n\r\n{\n\t\"ErrorCode\": -117440512,\n\t\"ErrorMessage\": \"Failed to send a control message request\"\n}` + sanitizedRequest := `PUT /projects/perform044wp2gqixmhkv1/datafetch_output_stream/20 HTTP/1.1\r\nUser-Agent: fasthttp\r\nHost: v3io-webapi:8081\r\nContent-Type: application/json\r\nContent-Length: 58\r\nX-V3io-Session-Key: SANITIZED\r\nX-V3io-Function: GetItem\r\n\r\n{\"AttributesToGet\": \"__serving_committed_sequence_number\"}` + httpBody := fmt.Errorf("Expected a 2xx response status code: %s\nRequest details:\n%s", + httpResponseStr, sanitizedRequest) + + // Wrap with ErrorWithStatusCode + statusCodeErr := v3ioerrors.NewErrorWithStatusCode(httpBody, 503) + + // Further wrap the error to simulate the nested error chain + shardItemErr := errors.Wrap(statusCodeErr, "Failed getting shard item") + sequenceNumberErr := errors.Wrap(shardItemErr, "Failed to get shard sequenceNumber from item attributes") + persistencyErr := errors.Wrap(sequenceNumberErr, "Failed to get shard location from persistency") + locationErr := errors.Wrap(persistencyErr, "Failed to get shard location") + finalErr := errors.Wrapf(locationErr, "Failed to get shard location state, attempts exhausted. shard id: %d", 20) + + // Test that EngineErrorIsNonFatal correctly unwraps the nested error chain + // Since status code 503 is now in nonFatalStatusCodes, expect true + result := EngineErrorIsNonFatal(finalErr) + suite.Require().True(result, "Expected EngineErrorIsNonFatal to return true (503 is in nonFatalStatusCodes)") +} + +func (suite *helperTestSuite) TestMatchErrorString() { + for _, testCase := range []struct { + name string + errMsg string + expected bool + }{ + { + name: "timeout error", + errMsg: "connection timeout occurred", + expected: true, + }, + { + name: "dial timeout error", + errMsg: "dialing to the given TCP address timed out", + expected: true, + }, + { + name: "connection refused", + errMsg: "connection refused", + expected: true, + }, + { + name: "generic error", + errMsg: "something went wrong", + expected: false, + }, + { + name: "empty error", + errMsg: "", + expected: false, + }, + } { + suite.Run(testCase.name, func() { + err := fmt.Errorf(testCase.errMsg) + result := isNonFatalErrorString(err) + suite.Require().Equal(testCase.expected, result) + }) + } +} + +func (suite *helperTestSuite) TestMatchErrorStatusCode() { + baseErr := fmt.Errorf("test error") + for _, testCase := range []struct { + name string + statusCode int + expected bool + }{ + { + name: "non-fatal status code 503", + statusCode: 503, + expected: true, + }, + { + name: "fatal status code 500", + statusCode: 500, + expected: false, + }, + { + name: "fatal status code 404", + statusCode: 404, + expected: false, + }, + { + name: "fatal status code 200", + statusCode: 200, + expected: false, + }, + } { + suite.Run(testCase.name, func() { + err := v3ioerrors.NewErrorWithStatusCode(baseErr, testCase.statusCode) + result := isNonFatalStatusCode(err) + suite.Require().Equal(testCase.expected, result) + }) + } +} + +func (suite *helperTestSuite) TestMatchErrorStatusCodeNonV3ioError() { + err := fmt.Errorf("regular error") + result := isNonFatalStatusCode(err) + suite.Require().False(result) +} + +func (suite *helperTestSuite) TestEngineErrorIsNonFatalStringMatch() { + for _, testCase := range []struct { + name string + errMsg string + expected bool + }{ + { + name: "timeout error", + errMsg: "operation timeout", + expected: true, + }, + { + name: "connection refused", + errMsg: "connection refused by server", + expected: true, + }, + { + name: "generic error", + errMsg: "something went wrong", + expected: false, + }, + } { + suite.Run(testCase.name, func() { + err := fmt.Errorf(testCase.errMsg) + result := EngineErrorIsNonFatal(err) + suite.Require().Equal(testCase.expected, result) + }) + } +} + +func (suite *helperTestSuite) TestEngineErrorIsNonFatalStatusCodeMatch() { + baseErr := fmt.Errorf("test error") + for _, testCase := range []struct { + name string + statusCode int + expected bool + }{ + { + name: "Service Temporarily Unavailable error", + statusCode: 503, + expected: true, + }, { + name: "no error", + statusCode: 200, + expected: false, + }, + } { + suite.Run(testCase.name, func() { + err := v3ioerrors.NewErrorWithStatusCode(baseErr, testCase.statusCode) + result := EngineErrorIsNonFatal(err) + suite.Require().Equal(testCase.expected, result) + }) + } + +} + +func TestHelperTestSuite(t *testing.T) { + suite.Run(t, new(helperTestSuite)) +} diff --git a/pkg/dataplane/http/context.go b/pkg/dataplane/http/context.go index 4a3d503..3781a4a 100755 --- a/pkg/dataplane/http/context.go +++ b/pkg/dataplane/http/context.go @@ -360,7 +360,7 @@ func (c *context) GetItemsSync(getItemsInput *v3io.GetItemsInput) (*v3io.Respons } if len(getItemsInput.DataPlaneInput.MtimeSec) > 0 { //nolint:staticcheck // QF1008 - headers["conditional-mtime-sec"] = getItemsInput.DataPlaneInput.MtimeSec //nolint:staticcheck // QF1008 + headers["conditional-mtime-sec"] = getItemsInput.DataPlaneInput.MtimeSec //nolint:staticcheck // QF1008 headers["conditional-mtime-nsec"] = getItemsInput.DataPlaneInput.MtimeNsec //nolint:staticcheck // QF1008 } diff --git a/pkg/dataplane/itemscursor.go b/pkg/dataplane/itemscursor.go index 3f21c52..0a03e01 100644 --- a/pkg/dataplane/itemscursor.go +++ b/pkg/dataplane/itemscursor.go @@ -43,7 +43,7 @@ type ItemsCursor struct { scattered bool logger logger.Logger //nolint:unused - retryAttempts int //nolint:unused + retryAttempts int //nolint:unused retryInterval time.Duration //nolint:unused } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 80b37c4..f219a0b 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -25,7 +25,7 @@ import ( ) var ErrInvalidTypeConversion = errors.New("Invalid type conversion") //nolint:staticcheck // ST1005 -var ErrNotFound = errors.New("Not found") //nolint:staticcheck // ST1005 +var ErrNotFound = errors.New("Not found") //nolint:staticcheck // ST1005 var ErrStopped = errors.New("Stopped") var ErrTimeout = errors.New("Timed out") //nolint:staticcheck // ST1005