Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ type ProxyOps interface {
}

type V1DataEngineProxyOps struct{}
type V2DataEngineProxyOps struct{}
type V2DataEngineProxyOps struct {
spdkServiceAddress string
}
Comment thread
derekbit marked this conversation as resolved.

type Proxy struct {
rpc.UnimplementedProxyEngineServiceServer
Expand All @@ -77,7 +79,7 @@ func NewProxy(ctx context.Context, logsDir, diskServiceAddress, spdkServiceAddre

ops := map[rpc.DataEngine]ProxyOps{
rpc.DataEngine_DATA_ENGINE_V1: V1DataEngineProxyOps{},
rpc.DataEngine_DATA_ENGINE_V2: V2DataEngineProxyOps{},
rpc.DataEngine_DATA_ENGINE_V2: V2DataEngineProxyOps{spdkServiceAddress: spdkServiceAddress},
}

spdkLocalClient, err := spdkclient.NewSPDKClient(spdkServiceAddress)
Expand Down Expand Up @@ -146,9 +148,5 @@ func getSPDKClientFromAddress(address string) (*spdkclient.SPDKClient, error) {
}

spdkServiceAddress := net.JoinHostPort(host, strconv.Itoa(types.InstanceManagerSpdkServiceDefaultPort))
if err != nil {
return nil, err
}

return spdkclient.NewSPDKClient(spdkServiceAddress)
}
69 changes: 54 additions & 15 deletions pkg/proxy/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
lhns "github.com/longhorn/go-common-libs/ns"
lhtypes "github.com/longhorn/go-common-libs/types"
eclient "github.com/longhorn/longhorn-engine/pkg/controller/client"
spdkclient "github.com/longhorn/longhorn-spdk-engine/pkg/client"
rpc "github.com/longhorn/types/pkg/generated/imrpc"

"github.com/longhorn/longhorn-instance-manager/pkg/util"
Expand Down Expand Up @@ -77,22 +78,26 @@ func (ops V1DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEng
}

func (ops V2DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEngineRequest) (resp *rpc.EngineVolumeGetProxyResponse, err error) {
c, err := getSPDKClientFromAddress(req.Address)
// The engine is always local to this IM (proxy runs on engine IM).
// Use the local SPDK service for EngineGet so it works even when the
// EngineFrontend is on a remote node.
localClient, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, "failed to get SPDK client from engine address %v: %v", req.Address, err)
return nil, grpcstatus.Errorf(grpccodes.Internal, "failed to create local SPDK client for address %v: %v", ops.spdkServiceAddress, err)
}
defer func() {
Comment thread
derekbit marked this conversation as resolved.
if closeErr := c.Close(); closeErr != nil {
if closeErr := localClient.Close(); closeErr != nil {
logrus.WithFields(logrus.Fields{
"serviceURL": req.Address,
Comment thread
derekbit marked this conversation as resolved.
"engineFrontendName": req.EngineFrontendName,
"spdkServiceAddress": ops.spdkServiceAddress,
"engineName": req.EngineName,
"volumeName": req.VolumeName,
"dataEngine": req.DataEngine,
}).WithError(closeErr).Warn("Failed to close SPDK client")
}).WithError(closeErr).Warn("Failed to close local SPDK client")
Comment thread
derekbit marked this conversation as resolved.
Comment thread
derekbit marked this conversation as resolved.
}
}()

engine, err := c.EngineGet(req.EngineName)
engine, err := localClient.EngineGet(req.EngineName)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, "failed to get engine %v: %v", req.EngineName, err)
}
Expand All @@ -104,15 +109,49 @@ func (ops V2DataEngineProxyOps) VolumeGet(ctx context.Context, req *rpc.ProxyEng
lastExpansionError := engine.LastExpansionError
lastExpansionFailedAt := engine.LastExpansionFailedAt
if req.EngineFrontendName != "" {
engineFrontend, err := c.EngineFrontendGet(req.EngineFrontendName)
if err == nil && engineFrontend != nil {
size = int64(engineFrontend.SpecSize)
endpoint = engineFrontend.Endpoint
frontend = engineFrontend.Frontend
isExpanding = engine.IsExpanding || engineFrontend.IsExpanding
if engineFrontend.LastExpansionError != "" {
lastExpansionError = engineFrontend.LastExpansionError
lastExpansionFailedAt = engineFrontend.LastExpansionFailedAt
// The EngineFrontend may be on a remote IM. Use req.Address
// (the EF IM address) and derive its SPDK service endpoint.
efClient, err := getSPDKClientFromAddress(req.Address)
if err != nil {
logrus.WithFields(logrus.Fields{
"serviceURL": req.Address,
"engineFrontendName": req.EngineFrontendName,
}).WithError(err).Warn("Failed to get SPDK client for engine frontend, returning engine-only info")
Comment thread
derekbit marked this conversation as resolved.
} else {
defer func() {
if closeErr := efClient.Close(); closeErr != nil {
logrus.WithFields(logrus.Fields{
"serviceURL": req.Address,
Comment thread
derekbit marked this conversation as resolved.
"engineFrontendName": req.EngineFrontendName,
}).WithError(closeErr).Warn("Failed to close EF SPDK client")
}
}()
engineFrontend, err := efClient.EngineFrontendGet(req.EngineFrontendName)
if err != nil {
logrus.WithFields(logrus.Fields{
"engineName": req.EngineName,
"volumeName": req.VolumeName,
"dataEngine": req.DataEngine,
"serviceURL": req.Address,
"engineFrontendName": req.EngineFrontendName,
}).WithError(err).Warn("Failed to get engine frontend, returning engine-only info")
} else if engineFrontend != nil {
Comment thread
derekbit marked this conversation as resolved.
size = int64(engineFrontend.SpecSize)
endpoint = engineFrontend.Endpoint
frontend = engineFrontend.Frontend
isExpanding = engine.IsExpanding || engineFrontend.IsExpanding
if engineFrontend.LastExpansionError != "" {
lastExpansionError = engineFrontend.LastExpansionError
lastExpansionFailedAt = engineFrontend.LastExpansionFailedAt
}
} else {
logrus.WithFields(logrus.Fields{
"engineName": req.EngineName,
"volumeName": req.VolumeName,
"dataEngine": req.DataEngine,
"serviceURL": req.Address,
"engineFrontendName": req.EngineFrontendName,
Comment thread
derekbit marked this conversation as resolved.
}).Warn("Engine frontend not found, returning engine-only info")
}
}
Comment thread
derekbit marked this conversation as resolved.
}
Expand Down
Loading