diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index a6e515d4f..7d40dc7b3 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -60,7 +60,9 @@ type ProxyOps interface { } type V1DataEngineProxyOps struct{} -type V2DataEngineProxyOps struct{} +type V2DataEngineProxyOps struct { + spdkServiceAddress string +} type Proxy struct { rpc.UnimplementedProxyEngineServiceServer @@ -77,7 +79,7 @@ func NewProxy(ctx context.Context, logsDir, diskServiceAddress, spdkServiceAddre ops := map[rpc.DataEngine]ProxyOps{ rpc.DataEngine_DATA_ENGINE_V1: V1DataEngineProxyOps{}, - rpc.DataEngine_DATA_ENGINE_V2: V2DataEngineProxyOps{}, + rpc.DataEngine_DATA_ENGINE_V2: V2DataEngineProxyOps{spdkServiceAddress: spdkServiceAddress}, } spdkLocalClient, err := spdkclient.NewSPDKClient(spdkServiceAddress) @@ -146,9 +148,5 @@ func getSPDKClientFromAddress(address string) (*spdkclient.SPDKClient, error) { } spdkServiceAddress := net.JoinHostPort(host, strconv.Itoa(types.InstanceManagerSpdkServiceDefaultPort)) - if err != nil { - return nil, err - } - return spdkclient.NewSPDKClient(spdkServiceAddress) } diff --git a/pkg/proxy/volume.go b/pkg/proxy/volume.go index 1735c03d4..9e6d63b10 100644 --- a/pkg/proxy/volume.go +++ b/pkg/proxy/volume.go @@ -16,6 +16,7 @@ import ( lhns "github.com/longhorn/go-common-libs/ns" lhtypes "github.com/longhorn/go-common-libs/types" eclient "github.com/longhorn/longhorn-engine/pkg/controller/client" + spdkclient "github.com/longhorn/longhorn-spdk-engine/pkg/client" rpc "github.com/longhorn/types/pkg/generated/imrpc" "github.com/longhorn/longhorn-instance-manager/pkg/util" @@ -77,22 +78,26 @@ func (ops V1DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEng } func (ops V2DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineVolumeGetProxyResponse, err error) { - c, err := getSPDKClientFromAddress(req.Address) + // The engine is always local to this IM (proxy runs on engine IM). + // Use the local SPDK service for EngineGet so it works even when the + // EngineFrontend is on a remote node. + localClient, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) if err != nil { - return nil, grpcstatus.Errorf(grpccodes.Internal, "failed to get SPDK client from engine address %v: %v", req.Address, err) + return nil, grpcstatus.Errorf(grpccodes.Internal, "failed to create local SPDK client for address %v: %v", ops.spdkServiceAddress, err) } defer func() { - if closeErr := c.Close(); closeErr != nil { + if closeErr := localClient.Close(); closeErr != nil { logrus.WithFields(logrus.Fields{ "serviceURL": req.Address, - "engineFrontendName": req.EngineFrontendName, + "spdkServiceAddress": ops.spdkServiceAddress, + "engineName": req.EngineName, "volumeName": req.VolumeName, "dataEngine": req.DataEngine, - }).WithError(closeErr).Warn("Failed to close SPDK client") + }).WithError(closeErr).Warn("Failed to close local SPDK client") } }() - engine, err := c.EngineGet(req.EngineName) + engine, err := localClient.EngineGet(req.EngineName) if err != nil { return nil, grpcstatus.Errorf(grpccodes.Internal, "failed to get engine %v: %v", req.EngineName, err) } @@ -104,15 +109,49 @@ func (ops V2DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEng lastExpansionError := engine.LastExpansionError lastExpansionFailedAt := engine.LastExpansionFailedAt if req.EngineFrontendName != "" { - engineFrontend, err := c.EngineFrontendGet(req.EngineFrontendName) - if err == nil && engineFrontend != nil { - size = int64(engineFrontend.SpecSize) - endpoint = engineFrontend.Endpoint - frontend = engineFrontend.Frontend - isExpanding = engine.IsExpanding || engineFrontend.IsExpanding - if engineFrontend.LastExpansionError != "" { - lastExpansionError = engineFrontend.LastExpansionError - lastExpansionFailedAt = engineFrontend.LastExpansionFailedAt + // The EngineFrontend may be on a remote IM. Use req.Address + // (the EF IM address) and derive its SPDK service endpoint. + efClient, err := getSPDKClientFromAddress(req.Address) + if err != nil { + logrus.WithFields(logrus.Fields{ + "serviceURL": req.Address, + "engineFrontendName": req.EngineFrontendName, + }).WithError(err).Warn("Failed to get SPDK client for engine frontend, returning engine-only info") + } else { + defer func() { + if closeErr := efClient.Close(); closeErr != nil { + logrus.WithFields(logrus.Fields{ + "serviceURL": req.Address, + "engineFrontendName": req.EngineFrontendName, + }).WithError(closeErr).Warn("Failed to close EF SPDK client") + } + }() + engineFrontend, err := efClient.EngineFrontendGet(req.EngineFrontendName) + if err != nil { + logrus.WithFields(logrus.Fields{ + "engineName": req.EngineName, + "volumeName": req.VolumeName, + "dataEngine": req.DataEngine, + "serviceURL": req.Address, + "engineFrontendName": req.EngineFrontendName, + }).WithError(err).Warn("Failed to get engine frontend, returning engine-only info") + } else if engineFrontend != nil { + size = int64(engineFrontend.SpecSize) + endpoint = engineFrontend.Endpoint + frontend = engineFrontend.Frontend + isExpanding = engine.IsExpanding || engineFrontend.IsExpanding + if engineFrontend.LastExpansionError != "" { + lastExpansionError = engineFrontend.LastExpansionError + lastExpansionFailedAt = engineFrontend.LastExpansionFailedAt + } + } else { + logrus.WithFields(logrus.Fields{ + "engineName": req.EngineName, + "volumeName": req.VolumeName, + "dataEngine": req.DataEngine, + "serviceURL": req.Address, + "engineFrontendName": req.EngineFrontendName, + }).Warn("Engine frontend not found, returning engine-only info") } } }