diff --git a/CHANGELOG.md b/CHANGELOG.md index cbaa4e1df8..044a16088b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1081,6 +1081,13 @@ New configuration tunables under `edge.oidc`: * github.com/openziti/go-term-markdown: v1.0.1 (new) * github.com/openziti/ziti/v2: [v1.6.8 -> v2.0.0](https://github.com/openziti/ziti/compare/v1.6.8...v2.0.0) + * [Issue #3717](https://github.com/openziti/ziti/issues/3717) - Generic error message for specific error + * [Issue #3543](https://github.com/openziti/ziti/issues/3543) - New Circuit Failure code for sockets not available + * [Issue #3364](https://github.com/openziti/ziti/issues/3364) - Make no such host error specific + * [Issue #2888](https://github.com/openziti/ziti/issues/2888) - New specific Error code for port not allowed + * [Issue #2859](https://github.com/openziti/ziti/issues/2859) - Create specific error code for DNS failed resolution + * [Issue #1580](https://github.com/openziti/ziti/issues/1580) - Invalid link destination should have a specific error code + * [Issue #3706](https://github.com/openziti/ziti/issues/3706) - Increase link payload/ack queue sizes and make them configurable * [Issue #3778](https://github.com/openziti/ziti/issues/3778) - SetRouterDataModel can deadlock in the router * [Issue #3777](https://github.com/openziti/ziti/issues/3777) - With the new circuit reserve, we can have circuits with no path in the controller circuit set, which can cause panics * [Issue #3770](https://github.com/openziti/ziti/issues/3770) - Update Token Requests Should Close Channel Connections If Invalid diff --git a/common/ctrl_msg/messages.go b/common/ctrl_msg/messages.go index fb4051d019..7718c9ef2a 100644 --- a/common/ctrl_msg/messages.go +++ b/common/ctrl_msg/messages.go @@ -51,6 +51,11 @@ const ( ErrorTypeMisconfiguredTerminator = 2 ErrorTypeDialTimedOut = 3 ErrorTypeConnectionRefused = 4 + ErrorTypeRejectedByApplication = 5 + ErrorTypeDnsResolutionFailed = 6 + ErrorTypePortNotAllowed = 7 + ErrorTypeInvalidLinkDestination = 8 + ErrorTypeResourcesNotAvailable = 9 CreateCircuitPeerDataHeader = 10 diff --git a/controller/network/circuit_lifecycle.go b/controller/network/circuit_lifecycle.go index cc17573266..072cc7fe66 100644 --- a/controller/network/circuit_lifecycle.go +++ b/controller/network/circuit_lifecycle.go @@ -93,6 +93,11 @@ const ( CircuitFailureRouterErrMisconfiguredTerminator CircuitFailureCause = "ROUTER_ERR_MISCONFIGURED_TERMINATOR" CircuitFailureRouterErrDialTimedOut CircuitFailureCause = "ROUTER_ERR_DIAL_TIMED_OUT" CircuitFailureRouterErrDialConnRefused CircuitFailureCause = "ROUTER_ERR_CONN_REFUSED" + CircuitFailureRouterErrRejectedByApp CircuitFailureCause = "ROUTER_ERR_REJECTED_BY_APPLICATION" + CircuitFailureRouterErrDnsResolutionFailed CircuitFailureCause = "ROUTER_ERR_DNS_RESOLUTION_FAILED" + CircuitFailureRouterErrPortNotAllowed CircuitFailureCause = "ROUTER_ERR_PORT_NOT_ALLOWED" + CircuitFailureRouterErrInvalidLinkDest CircuitFailureCause = "ROUTER_ERR_INVALID_LINK_DESTINATION" + CircuitFailureRouterErrResourcesNotAvailable CircuitFailureCause = "ROUTER_ERR_RESOURCES_NOT_AVAILABLE" ) type CircuitError interface { diff --git a/controller/network/routesender.go b/controller/network/routesender.go index 099ab3a514..5dd5b9e795 100644 --- a/controller/network/routesender.go +++ b/controller/network/routesender.go @@ -179,6 +179,21 @@ func (self *routeSender) handleRouteSend(attempt uint32, path *model.Path, strat case ctrl_msg.ErrorTypeConnectionRefused: self.serviceCounters.ServiceTerminatorConnectionRefused(terminator.GetServiceId(), terminator.GetId()) failureCause = CircuitFailureRouterErrDialConnRefused + case ctrl_msg.ErrorTypeRejectedByApplication: + self.serviceCounters.ServiceDialOtherError(terminator.GetServiceId()) + failureCause = CircuitFailureRouterErrRejectedByApp + case ctrl_msg.ErrorTypeDnsResolutionFailed: + self.serviceCounters.ServiceDialOtherError(terminator.GetServiceId()) + failureCause = CircuitFailureRouterErrDnsResolutionFailed + case ctrl_msg.ErrorTypePortNotAllowed: + self.serviceCounters.ServiceDialOtherError(terminator.GetServiceId()) + failureCause = CircuitFailureRouterErrPortNotAllowed + case ctrl_msg.ErrorTypeInvalidLinkDestination: + self.serviceCounters.ServiceDialOtherError(terminator.GetServiceId()) + failureCause = CircuitFailureRouterErrInvalidLinkDest + case ctrl_msg.ErrorTypeResourcesNotAvailable: + self.serviceCounters.ServiceDialOtherError(terminator.GetServiceId()) + failureCause = CircuitFailureRouterErrResourcesNotAvailable default: logger.WithField("errorCode", status.ErrorCode).Error("unhandled error code") } diff --git a/quickstart/docker/docker-compose.yml b/quickstart/docker/docker-compose.yml index 82ba6e6293..fa9920fe67 100644 --- a/quickstart/docker/docker-compose.yml +++ b/quickstart/docker/docker-compose.yml @@ -3,6 +3,7 @@ services: image: "${ZITI_IMAGE}:${ZITI_VERSION}" healthcheck: test: curl -m 1 -s -k https://${ZITI_CTRL_EDGE_ADVERTISED_ADDRESS:-ziti-edge-controller}:${ZITI_CTRL_EDGE_ADVERTISED_PORT:-1280}/edge/client/v1/version + start_period: 30s interval: 1s timeout: 3s retries: 30 diff --git a/quickstart/docker/image/access-control.sh b/quickstart/docker/image/access-control.sh index 20f320ff5f..8820c6e425 100755 --- a/quickstart/docker/image/access-control.sh +++ b/quickstart/docker/image/access-control.sh @@ -3,8 +3,17 @@ echo "*****************************************************" #### Add service policies -# Allow all identities to use any edge router with the "public" attribute -ziti edge create edge-router-policy all-endpoints-public-routers --edge-router-roles "#public" --identity-roles "#all" - -# Allow all edge-routers to access all services -ziti edge create service-edge-router-policy all-routers-all-services --edge-router-roles "#all" --service-roles "#all" +# Retry in case the controller's Raft cluster hasn't elected a leader yet +_retries=20 +while true; do + if ziti edge create edge-router-policy all-endpoints-public-routers --edge-router-roles "#public" --identity-roles "#all" 2>&1 \ + && ziti edge create service-edge-router-policy all-routers-all-services --edge-router-roles "#all" --service-roles "#all" 2>&1; then + break + fi + if (( --_retries == 0 )); then + echo "ERROR: failed to create access control policies after retries" >&2 + exit 1 + fi + echo "INFO: waiting for controller to be ready for policy creation (${_retries} retries left)..." + sleep 3 +done diff --git a/quickstart/docker/image/run-router.sh b/quickstart/docker/image/run-router.sh index 60d2186eb7..9b4f0dc952 100755 --- a/quickstart/docker/image/run-router.sh +++ b/quickstart/docker/image/run-router.sh @@ -63,14 +63,26 @@ if [ ! -f "${_CONFIG_PATH}" ]; then fi - if "${ZITI_BIN_DIR-}/ziti" edge list edge-routers "name = \"${ZITI_ROUTER_NAME}\"" --csv | grep -q "${ZITI_ROUTER_NAME}"; then - echo "---------- Found existing edge-router ${ZITI_ROUTER_NAME}...." - else - "${ZITI_BIN_DIR}/ziti" edge create edge-router "${ZITI_ROUTER_NAME}" -o "${ZITI_HOME}/${ZITI_ROUTER_NAME}.jwt" -t -a "${ZITI_ROUTER_ROLES}" - sleep 1 + # Retry the edge-router creation in case the controller's Raft cluster hasn't elected a leader yet + _retries=20 + while ! "${ZITI_BIN_DIR-}/ziti" edge list edge-routers "name = \"${ZITI_ROUTER_NAME}\"" --csv 2>/dev/null | grep -q "${ZITI_ROUTER_NAME}"; do + if "${ZITI_BIN_DIR}/ziti" edge create edge-router "${ZITI_ROUTER_NAME}" -o "${ZITI_HOME}/${ZITI_ROUTER_NAME}.jwt" -t -a "${ZITI_ROUTER_ROLES}" 2>&1; then + break + fi + if (( --_retries == 0 )); then + echo "ERROR: failed to create edge-router ${ZITI_ROUTER_NAME} after retries" >&2 + exit 1 + fi + echo "INFO: waiting for controller to be ready to create edge-router (${_retries} retries left)..." + sleep 3 + done + + if [ -f "${ZITI_HOME}/${ZITI_ROUTER_NAME}.jwt" ]; then echo "---------- Enrolling edge-router ${ZITI_ROUTER_NAME}...." "${ZITI_BIN_DIR}/ziti" router enroll "${ZITI_HOME}/${ZITI_ROUTER_NAME}.yaml" --jwt "${ZITI_HOME}/${ZITI_ROUTER_NAME}.jwt" echo "" + else + echo "---------- Found existing edge-router ${ZITI_ROUTER_NAME}...." fi else echo " Found existing config file ${_CONFIG_PATH}, not creating a new config." diff --git a/quickstart/docker/simplified-docker-compose.yml b/quickstart/docker/simplified-docker-compose.yml index 842d227ce7..d76f1a473b 100644 --- a/quickstart/docker/simplified-docker-compose.yml +++ b/quickstart/docker/simplified-docker-compose.yml @@ -3,6 +3,7 @@ services: image: "${ZITI_IMAGE}:${ZITI_VERSION}" healthcheck: test: curl -m 1 -s -k -f https://${ZITI_CTRL_EDGE_ADVERTISED_ADDRESS:-ziti-edge-controller}:${ZITI_CTRL_EDGE_ADVERTISED_PORT:-1280}/edge/client/v1/version + start_period: 30s interval: 3s timeout: 3s retries: 60 diff --git a/quickstart/test/compose-test.zsh b/quickstart/test/compose-test.zsh index 59c4aeeea2..bcdfd69289 100755 --- a/quickstart/test/compose-test.zsh +++ b/quickstart/test/compose-test.zsh @@ -149,7 +149,7 @@ fi # wait for the controller and router to be ready and run the certificate check script; NOUNSET option is enabled after # sourcing quickstart functions and env because there are some unset variables in those -docker compose exec ziti-controller \ +if ! docker compose exec ziti-controller \ bash -eo pipefail -c ' source "${ZITI_SCRIPTS}/ziti-cli-functions.sh" >/dev/null; echo "INFO: waiting for controller"; @@ -157,8 +157,13 @@ docker compose exec ziti-controller \ _wait_for_controller >/dev/null; echo "INFO: waiting for public router"; source /persistent/ziti.env >/dev/null; - _wait_for_public_router >/dev/null; + _wait_for_public_router; ' +then + echo "ERROR: router failed to come up, dumping compose logs" + docker compose logs + exit 1 +fi # TODO: re-add cert checks to above test suite after https://github.com/openziti/ziti/pull/1278 # zsh /persistent/check-cert-chains.zsh; docker compose --profile test run --rm quickstart-test diff --git a/router/forwarder/forwarder.go b/router/forwarder/forwarder.go index e91c44ac44..b92c11687c 100644 --- a/router/forwarder/forwarder.go +++ b/router/forwarder/forwarder.go @@ -34,6 +34,26 @@ import ( "github.com/sirupsen/logrus" ) +// InvalidLinkDestinationError indicates a route referenced a link destination that does not exist. +type InvalidLinkDestinationError struct { + LinkId string +} + +func (e *InvalidLinkDestinationError) Error() string { + return fmt.Sprintf("invalid link destination %v", e.LinkId) +} + +// NewInvalidLinkDestinationError creates an error indicating that the specified link destination is not valid. +func NewInvalidLinkDestinationError(linkId string) error { + return &InvalidLinkDestinationError{LinkId: linkId} +} + +// IsInvalidLinkDestinationError returns true if the error is or wraps an InvalidLinkDestinationError. +func IsInvalidLinkDestinationError(err error) bool { + var target *InvalidLinkDestinationError + return errors.As(err, &target) +} + type Forwarder struct { circuits *circuitTable destinations *destinationTable @@ -131,7 +151,7 @@ func (forwarder *Forwarder) Route(ctrlId string, route *ctrl_pb.Route) error { if !forwarder.HasDestination(xgress.Address(forward.DstAddress)) { if forward.DstType == ctrl_pb.DestType_Link { forwarder.faulter.NotifyInvalidLink(forward.DstAddress) - return fmt.Errorf("invalid link destination %v", forward.DstAddress) + return NewInvalidLinkDestinationError(forward.DstAddress) } if forward.DstType == ctrl_pb.DestType_End { return fmt.Errorf("invalid egress destination %v", forward.DstAddress) diff --git a/router/handler_ctrl/classify_dial_error_test.go b/router/handler_ctrl/classify_dial_error_test.go new file mode 100644 index 0000000000..fad99d2b4c --- /dev/null +++ b/router/handler_ctrl/classify_dial_error_test.go @@ -0,0 +1,120 @@ +/* + Copyright NetFoundry Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package handler_ctrl + +import ( + "fmt" + "net" + "os" + "syscall" + "testing" + + "github.com/openziti/sdk-golang/xgress" + "github.com/openziti/ziti/v2/common/ctrl_msg" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func Test_classifyDialError(t *testing.T) { + t.Run("connection refused", func(t *testing.T) { + err := fmt.Errorf("dial failed: %w", syscall.ECONNREFUSED) + require.Equal(t, byte(ctrl_msg.ErrorTypeConnectionRefused), classifyDialError(err)) + }) + + t.Run("DNS resolution failure", func(t *testing.T) { + err := &net.DNSError{Err: "no such host", Name: "nonexistent.example.com", IsNotFound: true} + require.Equal(t, byte(ctrl_msg.ErrorTypeDnsResolutionFailed), classifyDialError(err)) + }) + + t.Run("wrapped DNS resolution failure", func(t *testing.T) { + dnsErr := &net.DNSError{Err: "no such host", Name: "nonexistent.example.com", IsNotFound: true} + err := errors.Wrap(dnsErr, "failed to dial") + require.Equal(t, byte(ctrl_msg.ErrorTypeDnsResolutionFailed), classifyDialError(err)) + }) + + t.Run("resource exhaustion EMFILE", func(t *testing.T) { + err := fmt.Errorf("dial failed: %w", syscall.EMFILE) + require.Equal(t, byte(ctrl_msg.ErrorTypeResourcesNotAvailable), classifyDialError(err)) + }) + + t.Run("resource exhaustion ENFILE", func(t *testing.T) { + err := fmt.Errorf("dial failed: %w", syscall.ENFILE) + require.Equal(t, byte(ctrl_msg.ErrorTypeResourcesNotAvailable), classifyDialError(err)) + }) + + t.Run("resource exhaustion ENOBUFS", func(t *testing.T) { + err := fmt.Errorf("dial failed: %w", syscall.ENOBUFS) + require.Equal(t, byte(ctrl_msg.ErrorTypeResourcesNotAvailable), classifyDialError(err)) + }) + + t.Run("timeout", func(t *testing.T) { + err := fmt.Errorf("dial failed: %w", syscall.ETIMEDOUT) + require.Equal(t, byte(ctrl_msg.ErrorTypeDialTimedOut), classifyDialError(err)) + }) + + t.Run("timeout via net.OpError", func(t *testing.T) { + err := &net.OpError{ + Op: "dial", + Net: "tcp", + Err: &os.SyscallError{Syscall: "connect", Err: syscall.ETIMEDOUT}, + } + require.Equal(t, byte(ctrl_msg.ErrorTypeDialTimedOut), classifyDialError(err)) + }) + + t.Run("misconfigured terminator", func(t *testing.T) { + err := xgress.MisconfiguredTerminatorError{InnerError: fmt.Errorf("bad address")} + require.Equal(t, byte(ctrl_msg.ErrorTypeMisconfiguredTerminator), classifyDialError(err)) + }) + + t.Run("invalid terminator", func(t *testing.T) { + err := xgress.InvalidTerminatorError{InnerError: fmt.Errorf("not found")} + require.Equal(t, byte(ctrl_msg.ErrorTypeInvalidTerminator), classifyDialError(err)) + }) + + t.Run("port not allowed", func(t *testing.T) { + err := fmt.Errorf("failed to establish connection: port 7070 is not in allowed port ranges") + require.Equal(t, byte(ctrl_msg.ErrorTypePortNotAllowed), classifyDialError(err)) + }) + + t.Run("rejected by application", func(t *testing.T) { + err := fmt.Errorf("failed to establish connection with terminator address abc. error: (rejected by application)") + require.Equal(t, byte(ctrl_msg.ErrorTypeRejectedByApplication), classifyDialError(err)) + }) + + t.Run("generic error", func(t *testing.T) { + err := fmt.Errorf("something unexpected happened") + require.Equal(t, byte(ctrl_msg.ErrorTypeGeneric), classifyDialError(err)) + }) + + // DNS errors should be classified before timeout errors, since *net.DNSError implements net.Error + t.Run("DNS error not classified as timeout", func(t *testing.T) { + err := &net.DNSError{Err: "server misbehaving", Name: "example.com", IsTimeout: true} + require.Equal(t, byte(ctrl_msg.ErrorTypeDnsResolutionFailed), classifyDialError(err), + "DNS errors should be classified as DNS failures even when IsTimeout is true") + }) + + // ER/T hosted services send errors as strings, so DNS errors lose their type + t.Run("DNS error from string (no such host)", func(t *testing.T) { + err := fmt.Errorf("failed to establish connection: dial tcp: lookup bad.host on 127.0.0.53:53: no such host") + require.Equal(t, byte(ctrl_msg.ErrorTypeDnsResolutionFailed), classifyDialError(err)) + }) + + t.Run("DNS error from string (server misbehaving)", func(t *testing.T) { + err := fmt.Errorf("failed to establish connection: dial tcp: lookup bad.host on 127.0.0.53:53: server misbehaving") + require.Equal(t, byte(ctrl_msg.ErrorTypeDnsResolutionFailed), classifyDialError(err)) + }) +} diff --git a/router/handler_ctrl/route.go b/router/handler_ctrl/route.go index 0ec8ff7086..ddf12cc6b9 100644 --- a/router/handler_ctrl/route.go +++ b/router/handler_ctrl/route.go @@ -18,6 +18,7 @@ package handler_ctrl import ( "net" + "strings" "syscall" "time" @@ -115,7 +116,11 @@ func (rh *routeHandler) HandleReceive(msg *channel.Message, ch channel.Channel) func (rh *routeHandler) completeRoute(msg *channel.Message, attempt int, route *ctrl_pb.Route, peerData xt.PeerData, log *logrus.Entry) { if err := rh.forwarder.Route(rh.ch.PeerId(), route); err != nil { - rh.fail(msg, attempt, route, err, ctrl_msg.ErrorTypeGeneric, log) + var errCode byte = ctrl_msg.ErrorTypeGeneric + if forwarder.IsInvalidLinkDestinationError(err) { + errCode = ctrl_msg.ErrorTypeInvalidLinkDestination + } + rh.fail(msg, attempt, route, err, errCode, log) return } @@ -169,20 +174,7 @@ func (rh *routeHandler) connectEgress(msg *channel.Message, attempt int, ch chan if peerData, err := dialer.Dial(params); err == nil { rh.completeRoute(msg, attempt, route, peerData, log) } else { - var errCode byte - - switch { - case errors.Is(err, syscall.ECONNREFUSED): - errCode = ctrl_msg.ErrorTypeConnectionRefused - case isNetworkTimeout(err) || errors.Is(err, syscall.ETIMEDOUT): - errCode = ctrl_msg.ErrorTypeDialTimedOut - case errors.As(err, &xgress.MisconfiguredTerminatorError{}): - errCode = ctrl_msg.ErrorTypeMisconfiguredTerminator - case errors.As(err, &xgress.InvalidTerminatorError{}): - errCode = ctrl_msg.ErrorTypeInvalidTerminator - default: - errCode = ctrl_msg.ErrorTypeGeneric - } + errCode := classifyDialError(err) rh.fail(msg, attempt, route, errors.Wrapf(err, "error creating route for [c/%s]", route.CircuitId), errCode, log) } @@ -196,9 +188,56 @@ func (rh *routeHandler) connectEgress(msg *channel.Message, attempt int, ch chan } } +// classifyDialError maps a dial error to a specific error type code for circuit failure reporting. +func classifyDialError(err error) byte { + switch { + case errors.Is(err, syscall.ECONNREFUSED): + return ctrl_msg.ErrorTypeConnectionRefused + case isResourcesNotAvailable(err): + return ctrl_msg.ErrorTypeResourcesNotAvailable + case isDnsError(err): + return ctrl_msg.ErrorTypeDnsResolutionFailed + case isNetworkTimeout(err): + return ctrl_msg.ErrorTypeDialTimedOut + case errors.As(err, &xgress.MisconfiguredTerminatorError{}): + return ctrl_msg.ErrorTypeMisconfiguredTerminator + case errors.As(err, &xgress.InvalidTerminatorError{}): + return ctrl_msg.ErrorTypeInvalidTerminator + case isPortNotAllowedError(err): + return ctrl_msg.ErrorTypePortNotAllowed + case isRejectedByApplicationError(err): + return ctrl_msg.ErrorTypeRejectedByApplication + default: + return ctrl_msg.ErrorTypeGeneric + } +} + +func isResourcesNotAvailable(err error) bool { + return errors.Is(err, syscall.EMFILE) || errors.Is(err, syscall.ENFILE) || errors.Is(err, syscall.ENOBUFS) +} + +func isDnsError(err error) bool { + var dnsErr *net.DNSError + if errors.As(err, &dnsErr) { + return true + } + // ER/T hosted services send dial errors as strings via the SDK protocol, + // so the *net.DNSError type is lost. Fall back to string matching. + errMsg := err.Error() + return strings.Contains(errMsg, "no such host") || strings.Contains(errMsg, "server misbehaving") +} + +func isPortNotAllowedError(err error) bool { + return strings.Contains(err.Error(), "not in allowed port ranges") +} + +func isRejectedByApplicationError(err error) bool { + return strings.Contains(err.Error(), "rejected by application") +} + func isNetworkTimeout(err error) bool { var netErr net.Error - return errors.As(err, &netErr) + return (errors.As(err, &netErr) && netErr.Timeout()) || errors.Is(err, syscall.ETIMEDOUT) } func newDialParams(ctrlId string, route *ctrl_pb.Route, bindHandler xgress.BindHandler, logContext logcontext.Context, deadline time.Time) *dialParams { diff --git a/tests/authenticate.go b/tests/authenticate.go index 068d3b1420..203f4e9dc3 100644 --- a/tests/authenticate.go +++ b/tests/authenticate.go @@ -1160,6 +1160,18 @@ func (self *terminatorWatcher) waitForTerminators(timeout time.Duration) { } } +func (request *authenticatedRequests) waitForTerminatorState(serviceId string, check func([]*terminator) bool, timeout time.Duration) { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + terminators := request.listTerminators(fmt.Sprintf(`service="%s"`, serviceId)) + if check(terminators) { + return + } + time.Sleep(100 * time.Millisecond) + } + request.testContext.Fail("timed out waiting for terminator state") +} + func newSelfSignedCert(commonName string) (*x509.Certificate, crypto.PrivateKey) { priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) if err != nil { diff --git a/tests/circuit_failure_cause_test.go b/tests/circuit_failure_cause_test.go new file mode 100644 index 0000000000..ebe448fd8c --- /dev/null +++ b/tests/circuit_failure_cause_test.go @@ -0,0 +1,241 @@ +//go:build dataflow + +/* + Copyright NetFoundry Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package tests + +import ( + "encoding/json" + "testing" + "time" + + "github.com/michaelquigley/pfxlog" + "github.com/openziti/sdk-golang/ziti" + "github.com/openziti/ziti/v2/controller/event" + "github.com/openziti/ziti/v2/controller/network" + "github.com/openziti/ziti/v2/controller/xt_smartrouting" + routerEnv "github.com/openziti/ziti/v2/router/env" + "github.com/pkg/errors" +) + +// circuitFailureCollector captures circuit failed events from the event dispatcher. +type circuitFailureCollector struct { + events chan *event.CircuitEvent +} + +func newCircuitFailureCollector() *circuitFailureCollector { + return &circuitFailureCollector{ + events: make(chan *event.CircuitEvent, 50), + } +} + +func (self *circuitFailureCollector) AcceptCircuitEvent(evt *event.CircuitEvent) { + if evt.EventType == event.CircuitFailed { + self.events <- evt + } +} + +// waitForFailedCircuitForService waits for a circuit failed event matching the given service ID. +// Events for other services are discarded. +func (self *circuitFailureCollector) waitForFailedCircuitForService(ctx *TestContext, serviceId string, timeout time.Duration) *event.CircuitEvent { + deadline := time.After(timeout) + for { + select { + case evt := <-self.events: + if evt.ServiceId == serviceId { + return evt + } + case <-deadline: + ctx.Fail("timed out waiting for circuit failed event for service " + serviceId) + return nil + } + } +} + +func Test_CircuitFailureCauses(t *testing.T) { + ctx := NewTestContext(t) + defer ctx.Teardown() + ctx.StartServer() + ctx.RequireAdminManagementApiLogin() + + // blanket policies + ctx.AdminManagementSession.requireNewServicePolicy("Dial", s("#all"), s("#all"), nil) + ctx.AdminManagementSession.requireNewServicePolicy("Bind", s("#all"), s("#all"), nil) + ctx.AdminManagementSession.requireNewEdgeRouterPolicy(s("#all"), s("#all")) + ctx.AdminManagementSession.requireNewServiceEdgeRouterPolicy(s("#all"), s("#all")) + + // register circuit failure event collector + fc := newCircuitFailureCollector() + dispatcher := ctx.fabricController.GetEventDispatcher() + dispatcher.AddCircuitEventHandler(fc) + defer dispatcher.RemoveCircuitEventHandler(fc) + + // start ER/T in host mode + ctx.CreateEnrollAndStartTunnelerEdgeRouterWithCfgTweaks(func(cfg *routerEnv.Config) { + for _, l := range cfg.Listeners { + if l.Name == "tunnel" { + if opts, ok := l.Options["options"].(map[interface{}]interface{}); ok { + opts["mode"] = "host" + delete(opts, "services") + } + } + } + }) + + t.Run("rejected_by_application", func(t *testing.T) { + ctx.testContextChanged(t) + + service := ctx.AdminManagementSession.testContext.newService(nil, nil) + service.terminatorStrategy = xt_smartrouting.Name + service.Id = ctx.AdminManagementSession.requireCreateEntity(service) + + _, hostContext := ctx.AdminManagementSession.RequireCreateSdkContext() + defer hostContext.Close() + + terminatorWatcher := ctx.AdminManagementSession.newTerminatorWatcher(service.Id, 1) + defer terminatorWatcher.Close() + + listener, err := hostContext.ListenWithOptions(service.Name, &ziti.ListenOptions{ + ManualStart: true, + }) + ctx.Req.NoError(err) + defer listener.Close() + + terminatorWatcher.waitForTerminators(15 * time.Second) + + // host goroutine: accept and reject + go func() { + conn, err := listener.AcceptEdge() + if err != nil { + pfxlog.Logger().WithError(err).Error("accept failed") + return + } + conn.CompleteAcceptFailed(errors.New("rejected by application")) + _ = conn.Close() + }() + + _, clientContext := ctx.AdminManagementSession.RequireCreateSdkContext() + defer clientContext.Close() + + _, err = clientContext.Dial(service.Name) + ctx.Req.Error(err) + + evt := fc.waitForFailedCircuitForService(ctx, service.Id, 10*time.Second) + ctx.Req.NotNil(evt.FailureCause, "expected FailureCause to be set") + ctx.Req.Equal(string(network.CircuitFailureRouterErrRejectedByApp), *evt.FailureCause) + }) + + t.Run("dns_resolution_failed", func(t *testing.T) { + ctx.testContextChanged(t) + + hostConfig := ctx.newConfig("NH5p4FpGR", map[string]interface{}{ + "protocol": "tcp", + "address": "nonexistent.invalid.host.test", + "port": float64(8080), + }) + ctx.AdminManagementSession.requireCreateEntity(hostConfig) + + service := ctx.AdminManagementSession.testContext.newService(nil, s(hostConfig.Id)) + service.terminatorStrategy = xt_smartrouting.Name + ctx.AdminManagementSession.requireCreateEntity(service) + + terminatorWatcher := ctx.AdminManagementSession.newTerminatorWatcher(service.Id, 1) + defer terminatorWatcher.Close() + terminatorWatcher.waitForTerminators(30 * time.Second) + + _, clientContext := ctx.AdminManagementSession.RequireCreateSdkContext() + defer clientContext.Close() + + _, err := clientContext.Dial(service.Name) + ctx.Req.Error(err) + + evt := fc.waitForFailedCircuitForService(ctx, service.Id, 10*time.Second) + ctx.Req.NotNil(evt.FailureCause, "expected FailureCause to be set") + ctx.Req.Equal(string(network.CircuitFailureRouterErrDnsResolutionFailed), *evt.FailureCause) + }) + + t.Run("connection_refused", func(t *testing.T) { + ctx.testContextChanged(t) + + hostConfig := ctx.newConfig("NH5p4FpGR", map[string]interface{}{ + "protocol": "tcp", + "address": "127.0.0.1", + "port": float64(54321), + }) + ctx.AdminManagementSession.requireCreateEntity(hostConfig) + + service := ctx.AdminManagementSession.testContext.newService(nil, s(hostConfig.Id)) + service.terminatorStrategy = xt_smartrouting.Name + ctx.AdminManagementSession.requireCreateEntity(service) + + terminatorWatcher := ctx.AdminManagementSession.newTerminatorWatcher(service.Id, 1) + defer terminatorWatcher.Close() + terminatorWatcher.waitForTerminators(30 * time.Second) + + _, clientContext := ctx.AdminManagementSession.RequireCreateSdkContext() + defer clientContext.Close() + + _, err := clientContext.Dial(service.Name) + ctx.Req.Error(err) + + evt := fc.waitForFailedCircuitForService(ctx, service.Id, 10*time.Second) + ctx.Req.NotNil(evt.FailureCause, "expected FailureCause to be set") + ctx.Req.Equal(string(network.CircuitFailureRouterErrDialConnRefused), *evt.FailureCause) + }) + + t.Run("port_not_allowed", func(t *testing.T) { + ctx.testContextChanged(t) + + hostConfig := ctx.newConfig("NH5p4FpGR", map[string]interface{}{ + "forwardProtocol": true, + "allowedProtocols": []interface{}{"tcp"}, + "address": "127.0.0.1", + "forwardPort": true, + "allowedPortRanges": []interface{}{ + map[string]interface{}{"low": float64(8000), "high": float64(8100)}, + }, + }) + ctx.AdminManagementSession.requireCreateEntity(hostConfig) + + service := ctx.AdminManagementSession.testContext.newService(nil, s(hostConfig.Id)) + service.terminatorStrategy = xt_smartrouting.Name + ctx.AdminManagementSession.requireCreateEntity(service) + + terminatorWatcher := ctx.AdminManagementSession.newTerminatorWatcher(service.Id, 1) + defer terminatorWatcher.Close() + terminatorWatcher.waitForTerminators(30 * time.Second) + + _, clientContext := ctx.AdminManagementSession.RequireCreateSdkContext() + defer clientContext.Close() + + appData, err := json.Marshal(map[string]interface{}{ + "dst_protocol": "tcp", + "dst_ip": "127.0.0.1", + "dst_port": "9999", + }) + ctx.Req.NoError(err) + + _, err = clientContext.DialWithOptions(service.Name, &ziti.DialOptions{ + AppData: appData, + }) + ctx.Req.Error(err) + + evt := fc.waitForFailedCircuitForService(ctx, service.Id, 10*time.Second) + ctx.Req.NotNil(evt.FailureCause, "expected FailureCause to be set") + ctx.Req.Equal(string(network.CircuitFailureRouterErrPortNotAllowed), *evt.FailureCause) + }) +} diff --git a/tests/context.go b/tests/context.go index 8bdd84c5ae..b7f2a6d649 100644 --- a/tests/context.go +++ b/tests/context.go @@ -593,6 +593,14 @@ func (ctx *TestContext) CreateEnrollAndStartTunnelerEdgeRouter(roleAttributes .. ctx.startEdgeRouter(nil) } +// CreateEnrollAndStartTunnelerEdgeRouterWithCfgTweaks creates a tunneler-enabled edge router +// and allows the caller to modify the router config before startup. +func (ctx *TestContext) CreateEnrollAndStartTunnelerEdgeRouterWithCfgTweaks(cfgTweaks func(*routerEnv.Config), roleAttributes ...string) { + ctx.shutdownRouters() + ctx.createAndEnrollEdgeRouter(true, roleAttributes...) + ctx.startEdgeRouter(cfgTweaks) +} + func (ctx *TestContext) CreateEnrollAndStartEdgeRouter(roleAttributes ...string) *EdgeRouterHelper { ctx.shutdownRouters() ctx.createAndEnrollEdgeRouter(false, roleAttributes...) diff --git a/tests/sticky_terminator_test.go b/tests/sticky_terminator_test.go index 98f9372144..1bb5e18b7e 100644 --- a/tests/sticky_terminator_test.go +++ b/tests/sticky_terminator_test.go @@ -101,7 +101,14 @@ func Test_StickyTerminators(t *testing.T) { // bump the cost and make sure we stick with the same terminator even with the higher cost ctx.Req.NoError(listener1.UpdateCost(5000)) - time.Sleep(100 * time.Millisecond) + ctx.AdminManagementSession.waitForTerminatorState(service.Id, func(terminators []*terminator) bool { + for _, t := range terminators { + if t.cost == 5000 { + return true + } + } + return false + }, 5*time.Second) for range 10 { dialOptions := &ziti.DialOptions{ @@ -118,7 +125,14 @@ func Test_StickyTerminators(t *testing.T) { // Fail the terminator and make sure we fail over ctx.Req.NoError(listener1.UpdatePrecedence(edge.PrecedenceFailed)) - time.Sleep(100 * time.Millisecond) + ctx.AdminManagementSession.waitForTerminatorState(service.Id, func(terminators []*terminator) bool { + for _, t := range terminators { + if t.precedence == "failed" { + return true + } + } + return false + }, 5*time.Second) dialOptions := &ziti.DialOptions{ ConnectTimeout: time.Second, @@ -134,7 +148,14 @@ func Test_StickyTerminators(t *testing.T) { // Reset the initial terminator, bump the second terminator cost and make sure we stick with it ctx.Req.NoError(listener1.UpdateCostAndPrecedence(0, edge.PrecedenceDefault)) ctx.Req.NoError(listener2.UpdateCost(5000)) - time.Sleep(100 * time.Millisecond) + ctx.AdminManagementSession.waitForTerminatorState(service.Id, func(terminators []*terminator) bool { + for _, t := range terminators { + if t.precedence == "failed" { + return false + } + } + return len(terminators) == 2 + }, 5*time.Second) for range 10 { dialOptions = &ziti.DialOptions{