diff --git a/cmd/ipfs/pinmfs.go b/cmd/ipfs/pinmfs.go index 8ea9d2dcc8e..817b2af120e 100644 --- a/cmd/ipfs/pinmfs.go +++ b/cmd/ipfs/pinmfs.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "time" - "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" @@ -150,7 +149,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid repinInterval = defaultRepinInterval } else { var err error - repinInterval, err = time.ParseDuration(svcConfig.Policies.MFS.RepinInterval) + repinInterval, err = time.ParseDuration(svcConfig.Policies.MFS.RepinInterval) if err != nil { select { case errCh <- fmt.Errorf("remote pinning service %q has invalid MFS.RepinInterval (%v)", svcName, err): @@ -159,8 +158,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid ch <- lastPin{} continue } - } - + } // do nothing, if MFS has not changed since last pin on the exact same service or waiting for MFS.RepinInterval if last, ok := lastPins[svcName]; ok { if last.ServiceConfig == svcConfig && (last.CID == rootCid || time.Since(last.Time) < repinInterval) { @@ -176,7 +174,7 @@ func pinAllMFS(ctx context.Context, node pinMFSNode, cfg *config.Config, rootCid mfslog.Debugf("pinning MFS root %q to %q", rootCid, svcName) go func() { - if r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig); err != nil { + if r, err := pinMFS(ctx, node, rootCid, svcName, svcConfig, repinInterval); err != nil { select { case errCh <- fmt.Errorf("pinning MFS root %q to %q (%v)", rootCid, svcName, err): case <-ctx.Done(): @@ -200,9 +198,8 @@ func pinMFS( cid cid.Cid, svcName string, svcConfig config.RemotePinningService, -) (lastPin, error) { + repinInterval time.Duration) (lastPin, error) { c := pinclient.NewClient(svcConfig.API.Endpoint, svcConfig.API.Key) - pinName := svcConfig.Policies.MFS.PinName if pinName == "" { pinName = fmt.Sprintf("policy/%s/mfs", node.Identity().String()) @@ -216,6 +213,7 @@ func pinMFS( pinTime := time.Now().UTC() pinStatusMsg := "pinning to %q: received pre-existing %q status for %q (requestid=%q)" for ps := range lsPinCh { + log.Errorf("PS Typ eis is : %T", ps) existingRequestID = ps.GetRequestId() if ps.GetPin().GetCid() == cid && ps.GetStatus() == pinclient.StatusFailed { mfslog.Errorf(pinStatusMsg, svcName, pinclient.StatusFailed, cid, existingRequestID) @@ -227,6 +225,10 @@ func pinMFS( pinTime = ps.GetCreated().UTC() break } + //Run a goroutine to monitor status of queued pins + if ps.GetPin().GetCid() == cid && ps.GetStatus() == pinclient.StatusQueued { + checkPinStatus(ctx, node, svcName, svcConfig, ps, repinInterval) + } } for range lsPinCh { // in case the prior loop exits early } @@ -267,6 +269,27 @@ func pinMFS( if err != nil { return lastPin{}, err } - } + } return lastPin{Time: pinTime, ServiceName: svcName, ServiceConfig: svcConfig, CID: cid}, nil } + +func checkPinStatus(ctx context.Context, node pinMFSNode, svcName string, svcConfig config.RemotePinningService, ps pinclient.PinStatusGetter, + repinInterval time.Duration) { + result := make(chan int) + go func() { + for { + if ps.GetStatus() != pinclient.StatusFailed && ps.GetStatus() != pinclient.StatusPinned { + time.Sleep(repinInterval) + } else { + break + } + } + if(ps.GetStatus() == pinclient.StatusFailed) { + mfslog.Errorf("pinning to %q failed", svcName) + result <- 0 + } else { + mfslog.Debugf("pinning to %q succeeded", svcName) + result <- 1 + } + }() +}