Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (p *Process) Stop() {

func (p *Process) StopWithSignal(signal syscall.Signal) {
needStop, needTimeoutKill := false, false
needCloseLogger := false
now := time.Now()

p.lock.Lock()
Expand All @@ -156,9 +157,21 @@ func (p *Process) StopWithSignal(signal syscall.Signal) {
needTimeoutKill = true
}
}
// If the process stopped or errored on its own (no prior Stop call spawned
// a goroutine to close the logger), close it here so callers that observe a
// terminal state and then call Stop() do not leave the log file descriptor open.
if !needStop && !needTimeoutKill && p.DeletionTimestamp == nil &&
(p.State == StateStopped || p.State == StateError) {
needCloseLogger = true
}
p.lock.Unlock()

if !needStop && !needTimeoutKill {
if needCloseLogger {
if err := p.logger.Close(); err != nil {
logrus.WithError(err).Warnf("Process Manager: failed to close logger for process %v in state %v", p.Name, p.State)
}
}
return
}
p.UpdateCh <- p
Expand Down
16 changes: 13 additions & 3 deletions pkg/process/process_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ func (pm *Manager) ProcessCreate(ctx context.Context, req *rpc.ProcessCreateRequ
}

if err := pm.registerProcess(p); err != nil {
if closeErr := logger.Close(); closeErr != nil {
logrus.WithError(closeErr).Warnf("Process Manager: failed to close logger for process %v after registration failure", req.Spec.Name)
}
return nil, err
}

Expand Down Expand Up @@ -369,12 +372,12 @@ func (pm *Manager) broadcastConnector() (chan interface{}, error) {
return pm.broadcastCh, nil
}

func (pm *Manager) Subscribe() (<-chan interface{}, error) {
return pm.broadcaster.Subscribe(context.TODO(), pm.broadcastConnector)
func (pm *Manager) SubscribeWithContext(ctx context.Context) (<-chan interface{}, error) {
return pm.broadcaster.Subscribe(ctx, pm.broadcastConnector)
}

func (pm *Manager) ProcessWatch(req *emptypb.Empty, srv rpc.ProcessManagerService_ProcessWatchServer) (err error) {
responseChan, err := pm.Subscribe()
responseChan, err := pm.SubscribeWithContext(srv.Context())
if err != nil {
return err
}
Expand Down Expand Up @@ -480,11 +483,18 @@ func (pm *Manager) ProcessReplace(ctx context.Context, req *rpc.ProcessReplaceRe

processToReplace, err := pm.initProcessReplace(p)
if err != nil {
if closeErr := logger.Close(); closeErr != nil {
logrus.WithError(closeErr).Warnf("Process Manager: failed to close logger for replacement process %v after initialization failure", req.Spec.Name)
}
return nil, err
}

if processToReplace.Binary == p.Binary {
logrus.Infof("Process Manager: the existing process already has the updated engine image %v", p.Binary)
if closeErr := logger.Close(); closeErr != nil {
logrus.WithError(closeErr).Warnf("Process Manager: failed to close logger for unneeded replacement process %v", req.Spec.Name)
}
pm.releaseProcessPorts(p)
return processToReplace.RPCResponse(), nil
}

Expand Down
65 changes: 64 additions & 1 deletion pkg/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ import (
"context"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"testing"
"time"

rpc "github.com/longhorn/types/pkg/generated/imrpc"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

. "gopkg.in/check.v1"

rpc "github.com/longhorn/types/pkg/generated/imrpc"

"github.com/longhorn/longhorn-instance-manager/pkg/types"
)

Expand All @@ -41,6 +44,10 @@ type ProcessWatcher struct {
grpc.ServerStream
}

func (pw *ProcessWatcher) Context() context.Context {
return context.Background()
}

func (pw *ProcessWatcher) Send(resp *rpc.ProcessResponse) error {
//Do nothing for now, just act as the receiving end
return nil
Expand Down Expand Up @@ -240,6 +247,33 @@ func (s *TestSuite) TestProcessReplaceMissingBinary(c *C) {
wg.Wait()
}

func (s *TestSuite) TestProcessReplaceInitFailureClosesLogger(c *C) {
if _, err := os.Stat("/proc/self/fd"); err != nil {
c.Skip("/proc/self/fd is not available")
}

name := "test_process_replace_missing_process"
logPath, err := filepath.Abs(filepath.Join(s.logDir, name+".log"))
c.Assert(err, IsNil)

baseline, err := countOpenFileDescriptors(logPath)
c.Assert(err, IsNil)

_, err = s.pm.ProcessReplace(context.TODO(), &rpc.ProcessReplaceRequest{
Spec: createProcessSpec(name, TestBinaryReplace),
TerminateSignal: "SIGHUP",
})
c.Assert(err, NotNil)
c.Assert(status.Code(err), Equals, codes.NotFound)

after, err := countOpenFileDescriptors(logPath)
c.Assert(err, IsNil)
c.Assert(after, Equals, baseline)
_, err = os.Stat(logPath)
c.Assert(err, IsNil)
c.Assert(os.Remove(logPath), IsNil)
}

// there was a nil pointer case, while updating a process that is being
// deleted, since when initially checked the process was still in the map
// but by the time new process has started the old process had been removed
Expand Down Expand Up @@ -388,3 +422,32 @@ func waitForProcessListState(pm *Manager, predicate func(processes map[string]*r
}
return false, nil
}

func countOpenFileDescriptors(targetPath string) (int, error) {
targetPath, err := filepath.Abs(targetPath)
if err != nil {
return 0, err
}

entries, err := os.ReadDir("/proc/self/fd")
if err != nil {
return 0, err
}

count := 0
for _, entry := range entries {
resolvedPath, err := os.Readlink(filepath.Join("/proc/self/fd", entry.Name()))
if err != nil {
continue
}
resolvedPath, err = filepath.Abs(resolvedPath)
if err != nil {
continue
}
if resolvedPath == targetPath {
count++
}
}

return count, nil
}
51 changes: 28 additions & 23 deletions pkg/proxy/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,28 +293,34 @@ func (ops V2DataEngineProxyOps) ReplicaRebuildingStatus(ctx context.Context, req
}
// TODO: Need to unify the replica address format for v1 and v2 engine
tcpReplicaAddress := types.AddTcpPrefixForAddress(replicaAddress)
replicaCli, err := getSPDKClientFromAddress(replicaAddress)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, "failed to get SPDK client from replica address %v: %v", replicaAddress, err)
}
defer func() {
if closeErr := replicaCli.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close SPDK client")

statusResp, err := func() (*enginerpc.ReplicaRebuildStatusResponse, error) {
replicaCli, err := getSPDKClientFromAddress(replicaAddress)
if err != nil {
return nil, grpcstatus.Errorf(grpccodes.Internal, "failed to get SPDK client from replica address %v: %v", replicaAddress, err)
}
defer func() {
if closeErr := replicaCli.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close SPDK client")
}
}()

shallowCopyResp, err := replicaCli.ReplicaRebuildingDstShallowCopyCheck(replicaName)
if err != nil {
return nil, err
}
return &enginerpc.ReplicaRebuildStatusResponse{
Error: shallowCopyResp.Error,
IsRebuilding: shallowCopyResp.TotalState == spdktypes.ProgressStateInProgress,
Progress: int32(shallowCopyResp.TotalProgress),
State: shallowCopyResp.TotalState,
FromReplicaAddressList: []string{types.AddTcpPrefixForAddress(shallowCopyResp.SrcReplicaAddress)},
}, nil
}()

shallowCopyResp, err := replicaCli.ReplicaRebuildingDstShallowCopyCheck(replicaName)
if err != nil {
// Let the upper layer to handle this error rather than considering it as the error message of a rebuilding failure
return nil, err
}
resp.Status[tcpReplicaAddress] = &enginerpc.ReplicaRebuildStatusResponse{
Error: shallowCopyResp.Error,
IsRebuilding: shallowCopyResp.TotalState == spdktypes.ProgressStateInProgress,
Progress: int32(shallowCopyResp.TotalProgress),
State: shallowCopyResp.TotalState,
FromReplicaAddressList: []string{types.AddTcpPrefixForAddress(shallowCopyResp.SrcReplicaAddress)},
}
resp.Status[tcpReplicaAddress] = statusResp
}

return resp, nil
Expand Down Expand Up @@ -382,13 +388,12 @@ func (ops V2DataEngineProxyOps) ReplicaRebuildingQosSet(ctx context.Context, req
Warn("Failed to get SPDK client from replica address")
continue
}
defer func() {
if closeErr := replicaCli.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close SPDK replica client")
}
}()

if err := replicaCli.ReplicaRebuildingDstSetQosLimit(replicaName, req.QosLimitMbps); err != nil {
err = replicaCli.ReplicaRebuildingDstSetQosLimit(replicaName, req.QosLimitMbps)
if closeErr := replicaCli.Close(); closeErr != nil {
log.WithError(closeErr).Warn("Failed to close SPDK replica client")
}
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"replicaName": replicaName,
"replicaAddr": replicaAddress,
Expand Down
Loading