Skip to content

Commit c70e3fb

Browse files
committed
processing: hide units after completion
1 parent ea4b7be commit c70e3fb

File tree

2 files changed

+58
-6
lines changed

2 files changed

+58
-6
lines changed

internal/workflow/processing.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const (
3838
UpdateHARIActivityName = "update-hari-activity"
3939
UpdateProductionSystemActivityName = "update-production-system-activity"
4040
CleanUpActivityName = "clean-up-activity"
41+
HidePackageActivityName = "hide-package-activity"
4142

4243
processingConfig = "automated"
4344
)
@@ -145,6 +146,17 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, event *watcher.BlobEv
145146

146147
defer workflow.CompleteSession(sessCtx)
147148

149+
// Hide packages from Archivematica Dashboard.
150+
if tinfo.Status == collection.StatusDone {
151+
futures = []workflow.Future{}
152+
activityOpts = withActivityOptsForRequest(ctx)
153+
futures = append(futures, workflow.ExecuteActivity(activityOpts, HidePackageActivityName, tinfo.TransferID, "transfer", tinfo.Event.PipelineName))
154+
futures = append(futures, workflow.ExecuteActivity(activityOpts, HidePackageActivityName, tinfo.SIPID, "ingest", tinfo.Event.PipelineName))
155+
for _, f := range futures {
156+
_ = f.Get(activityOpts, nil)
157+
}
158+
}
159+
148160
activityOpts = withActivityOptsForRequest(sessCtx)
149161
err = workflow.ExecuteActivity(activityOpts, CleanUpActivityName, tinfo).Get(activityOpts, nil)
150162
if err != nil {
@@ -455,17 +467,15 @@ func NewCleanUpActivity(m *Manager) *CleanUpActivity {
455467
}
456468

457469
func (a *CleanUpActivity) Execute(ctx context.Context, tinfo *TransferInfo) error {
458-
if tinfo.RelPath == "" {
459-
return nil
460-
}
461-
462470
cfg, err := a.manager.Pipelines.Config(tinfo.Event.PipelineName)
463471
if err != nil {
464472
return err
465473
}
466474

467-
if err := os.RemoveAll(filepath.Join(cfg.TransferDir, tinfo.RelPath)); err != nil {
468-
return err
475+
if tinfo.RelPath != "" {
476+
if err := os.RemoveAll(filepath.Join(cfg.TransferDir, tinfo.RelPath)); err != nil {
477+
return err
478+
}
469479
}
470480

471481
return nil
@@ -501,3 +511,44 @@ func updatePackageStatusLocalActivity(ctx context.Context, colsvc collection.Ser
501511

502512
return colsvc.UpdateWorkflowStatus(ctx, tinfo.CollectionID, tinfo.Name, info.WorkflowExecution.ID, info.WorkflowExecution.RunID, tinfo.TransferID, tinfo.SIPID, tinfo.Status, tinfo.StoredAt)
503513
}
514+
515+
type HidePackageActivity struct {
516+
manager *Manager
517+
}
518+
519+
func NewHidePackageActivity(m *Manager) *HidePackageActivity {
520+
return &HidePackageActivity{manager: m}
521+
}
522+
523+
func (a *HidePackageActivity) Execute(ctx context.Context, unitID, unitType, pipelineName string) error {
524+
amc, err := a.manager.Pipelines.Client(pipelineName)
525+
if err != nil {
526+
return nonRetryableError(fmt.Errorf("error looking up pipeline config: %v", err))
527+
}
528+
529+
if unitType != "transfer" && unitType != "ingest" {
530+
return nonRetryableError(fmt.Errorf("unexpected unit type: %s", unitType))
531+
}
532+
533+
if unitType == "transfer" {
534+
resp, _, err := amc.Transfer.Hide(ctx, unitID)
535+
if err != nil {
536+
return fmt.Errorf("error hiding transfer: %v", err)
537+
}
538+
if resp.Removed != true {
539+
return fmt.Errorf("error hiding transfer: not removed")
540+
}
541+
}
542+
543+
if unitType == "ingest" {
544+
resp, _, err := amc.Ingest.Hide(ctx, unitID)
545+
if err != nil {
546+
return fmt.Errorf("error hiding sip: %v", err)
547+
}
548+
if resp.Removed != true {
549+
return fmt.Errorf("error hiding sip: not removed")
550+
}
551+
}
552+
553+
return nil
554+
}

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ func main() {
204204
cadence.RegisterActivity(workflow.NewUpdateHARIActivity(m).Execute, workflow.UpdateHARIActivityName)
205205
cadence.RegisterActivity(workflow.NewUpdateProductionSystemActivity(m).Execute, workflow.UpdateProductionSystemActivityName)
206206
cadence.RegisterActivity(workflow.NewCleanUpActivity(m).Execute, workflow.CleanUpActivityName)
207+
cadence.RegisterActivity(workflow.NewHidePackageActivity(m).Execute, workflow.HidePackageActivityName)
207208

208209
done := make(chan struct{})
209210
w, err := cadence.NewWorker(zlogger.Named("cadence-worker"), appName, config.Cadence)

0 commit comments

Comments
 (0)