Skip to content

Issue 7917: Fix/mfs/error pin fail #8525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
39 changes: 31 additions & 8 deletions cmd/ipfs/pinmfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"

Expand Down Expand Up @@ -150,7 +149,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid
repinInterval = defaultRepinInterval
} else {
var err error
repinInterval, err = time.ParseDuration(svcConfig.Policies.MFS.RepinInterval)
repinInterval, err = time.ParseDuration(svcConfig.Policies.MFS.RepinInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run go fmt to remove trailing whitespace, etc.

if err != nil {
select {
case errCh <- fmt.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err):
Expand All @@ -159,8 +158,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid
ch <- lastPin{}
continue
}
}

}
// do nothing, if MFS has not changed since last pin on the exact same service or waiting for MFS.RepinInterval
if last, ok := lastPins[svcName]; ok {
if last.ServiceConfig == svcConfig && (last.CID == rootCid || time.Since(last.Time) < repinInterval) {
Expand All @@ -176,7 +174,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid

mfslog.Debugf("pinning MFS root %q to %q", rootCid, svcName)
go func() {
if r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig); err != nil {
if r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig, repinInterval); err != nil {
select {
case errCh <- fmt.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err):
case <-ctx.Done():
Expand All @@ -200,9 +198,8 @@ func pinMFS(
cid cid.Cid,
svcName string,
svcConfig config.RemotePinningService,
) (lastPin, error) {
repinInterval time.Duration) (lastPin, error) {
c := pinclient.NewClient(svcConfig.API.Endpoint, svcConfig.API.Key)

pinName := svcConfig.Policies.MFS.PinName
if pinName == "" {
pinName = fmt.Sprintf("policy/%s/mfs", node.Identity().String())
Expand All @@ -216,6 +213,7 @@ func pinMFS(
pinTime := time.Now().UTC()
pinStatusMsg := "pinning to %q: received pre-existing %q status for %q (requestid=%q)"
for ps := range lsPinCh {
log.Errorf("PS Typ eis is : %T", ps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not log error here.

existingRequestID = ps.GetRequestId()
if ps.GetPin().GetCid() == cid && ps.GetStatus() == pinclient.StatusFailed {
mfslog.Errorf(pinStatusMsg, svcName, pinclient.StatusFailed, cid, existingRequestID)
Expand All @@ -227,6 +225,10 @@ func pinMFS(
pinTime = ps.GetCreated().UTC()
break
}
//Run a goroutine to monitor status of queued pins
if ps.GetPin().GetCid() == cid && ps.GetStatus() == pinclient.StatusQueued {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code above already does if ps.GetPin().GetCid() == cid so this and the above should be under a that single if condition.

checkPinStatus(ctx, node, svcName, svcConfig, ps, repinInterval)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we want to start a new goroutine for each pin. I think it would be better to communicate pins to watch to a single goroutine.

}
for range lsPinCh { // in case the prior loop exits early
}
Expand Down Expand Up @@ -267,6 +269,27 @@ func pinMFS(
if err != nil {
return lastPin{}, err
}
}
}
return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil
}

func checkPinStatus(ctx context.Context, node pinMFSNode, svcName string, svcConfig config.RemotePinningService, ps pinclient.PinStatusGetter,
repinInterval time.Duration) {
result := make(chan int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no reader for this channel, so the goroutine will block forever when trying to write to it.

go func() {
for {
if ps.GetStatus() != pinclient.StatusFailed && ps.GetStatus() != pinclient.StatusPinned {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the condition of the for loop, not a separate if. Then the else becomes unnecessary.

time.Sleep(repinInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use a time.Timer here, so that select can also wait for context cancelation as well as timeout.

} else {
break
}
}
if(ps.GetStatus() == pinclient.StatusFailed) {
mfslog.Errorf("pinning to %q failed", svcName)
result <- 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are these results used? If they are, a bool type is more appropriate and an int with a 0 or 1 value.

} else {
mfslog.Debugf("pinning to %q succeeded", svcName)
result <- 1
}
}()
}