Skip to content

Commit 4bd470a

Browse files
authored
workflow: add transfer validation capability
1 parent 7dd3174 commit 4bd470a

File tree

10 files changed

+200
-15
lines changed

10 files changed

+200
-15
lines changed

enduro.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ processingConfig = "automated"
4949
storageServiceURL = "http://test:test@127.0.0.1:62081"
5050
capacity = 3
5151

52+
[validation]
53+
checksumsCheckEnabled = false
54+
5255
[[hooks."hari"]]
5356
baseURL = "" # E.g.: "https://192.168.1.50:8080/api"
5457
mock = true

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/google/uuid v1.1.1
2525
github.com/google/wire v0.4.0 // indirect
2626
github.com/gorilla/schema v1.1.0
27+
github.com/hashicorp/go-multierror v1.1.0
2728
github.com/jmoiron/sqlx v1.2.0
2829
github.com/mholt/archiver v3.1.1+incompatible
2930
github.com/mitchellh/mapstructure v1.3.2 // indirect

internal/collection/workflow.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
cce "github.com/artefactual-labs/enduro/internal/cadence"
9+
"github.com/artefactual-labs/enduro/internal/validation"
910
"github.com/artefactual-labs/enduro/internal/watcher"
1011

1112
"github.com/google/uuid"
@@ -32,12 +33,15 @@ type ProcessingWorkflowRequest struct {
3233
// Captured by the watcher, the event contains information about the
3334
// incoming dataset.
3435
Event *watcher.BlobEvent
36+
37+
ValidationConfig validation.Config
3538
}
3639

37-
func InitProcessingWorkflow(ctx context.Context, c client.Client, event *watcher.BlobEvent) error {
40+
func InitProcessingWorkflow(ctx context.Context, c client.Client, event *watcher.BlobEvent, validationConfig validation.Config) error {
3841
req := &ProcessingWorkflowRequest{
39-
WorkflowID: fmt.Sprintf("processing-workflow-%s", uuid.New().String()),
40-
Event: event,
42+
WorkflowID: fmt.Sprintf("processing-workflow-%s", uuid.New().String()),
43+
Event: event,
44+
ValidationConfig: validationConfig,
4145
}
4246

4347
return TriggerProcessingWorkflow(ctx, c, req)

internal/validation/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package validation
2+
3+
type Config struct {
4+
ChecksumsCheckEnabled bool
5+
}
6+
7+
func (c Config) IsEnabled() bool {
8+
return c.ChecksumsCheckEnabled
9+
}

internal/validation/validate.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package validation
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path"
7+
8+
"github.com/hashicorp/go-multierror"
9+
)
10+
11+
var checksumFiles = [4]string{
12+
"checksum.md5",
13+
"checksum.sha1",
14+
"checksum.sha256",
15+
"checksum.sha512",
16+
}
17+
18+
func ValidateTransfer(c Config, path string) error {
19+
var result error
20+
21+
if c.ChecksumsCheckEnabled {
22+
v := ChecksumExistsValidator{path: path}
23+
if err := v.Valid(); err != nil {
24+
result = multierror.Append(result, err)
25+
}
26+
}
27+
28+
return result
29+
}
30+
31+
// Validator is the interface that all validators must implement.
32+
type Validator interface {
33+
Valid() error
34+
}
35+
36+
// ChecksumExistsValidator is a Validator that checks...
37+
type ChecksumExistsValidator struct {
38+
path string
39+
}
40+
41+
func (v ChecksumExistsValidator) Valid() error {
42+
for _, checksum := range checksumFiles {
43+
if fileExists(path.Join(v.path, "metadata", checksum)) {
44+
return nil
45+
}
46+
}
47+
return fmt.Errorf("Transfer does not contain checksums (path=%s)", v.path)
48+
}
49+
50+
func fileExists(name string) bool {
51+
stat, err := os.Stat(name)
52+
if err != nil {
53+
return false
54+
}
55+
return !stat.IsDir()
56+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package validation
2+
3+
import (
4+
"testing"
5+
6+
"gotest.tools/v3/assert"
7+
"gotest.tools/v3/fs"
8+
)
9+
10+
func TestChecksumExistsValidator(t *testing.T) {
11+
tests := map[string]struct {
12+
dirOpts []fs.PathOp
13+
errorMessage string
14+
}{
15+
"Validates when checksum.md5 file exists": {
16+
dirOpts: []fs.PathOp{
17+
fs.WithDir("metadata",
18+
fs.WithFile("checksum.md5", ""),
19+
),
20+
},
21+
},
22+
"Validates when checksum.sha1 file exists": {
23+
dirOpts: []fs.PathOp{
24+
fs.WithDir("metadata",
25+
fs.WithFile("checksum.sha1", ""),
26+
),
27+
},
28+
},
29+
"Validates when checksum.sha256 file exists": {
30+
dirOpts: []fs.PathOp{
31+
fs.WithDir("metadata",
32+
fs.WithFile("checksum.sha256", ""),
33+
),
34+
},
35+
},
36+
"Validates when checksum.sha512 file exists": {
37+
dirOpts: []fs.PathOp{
38+
fs.WithDir("metadata",
39+
fs.WithFile("checksum.sha512", ""),
40+
),
41+
},
42+
},
43+
"Fails when expected files do not exist": {
44+
dirOpts: []fs.PathOp{
45+
fs.WithDir("metadata",
46+
fs.WithFile("notchecksum.txt", ""),
47+
),
48+
},
49+
errorMessage: "Transfer does not contain checksums",
50+
},
51+
}
52+
for name, tc := range tests {
53+
name, tc := name, tc
54+
t.Run(name, func(t *testing.T) {
55+
tmpDir := fs.NewDir(t, "transfer", tc.dirOpts...)
56+
defer tmpDir.Remove()
57+
58+
validator := ChecksumExistsValidator{path: tmpDir.Path()}
59+
err := validator.Valid()
60+
61+
if tc.errorMessage == "" {
62+
assert.NilError(t, err)
63+
} else {
64+
assert.ErrorContains(t, err, tc.errorMessage)
65+
}
66+
})
67+
}
68+
}

internal/workflow/activities/activities.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
package activities
33

44
const (
5-
AcquirePipelineActivityName = "acquire-pipeline-activity"
6-
DownloadActivityName = "download-activity"
7-
BundleActivityName = "bundle-activity"
8-
TransferActivityName = "transfer-activity"
9-
PollTransferActivityName = "poll-transfer-activity"
10-
PollIngestActivityName = "poll-ingest-activity"
11-
CleanUpActivityName = "clean-up-activity"
12-
HidePackageActivityName = "hide-package-activity"
13-
DeleteOriginalActivityName = "delete-original-activity"
5+
AcquirePipelineActivityName = "acquire-pipeline-activity"
6+
DownloadActivityName = "download-activity"
7+
BundleActivityName = "bundle-activity"
8+
TransferActivityName = "transfer-activity"
9+
PollTransferActivityName = "poll-transfer-activity"
10+
PollIngestActivityName = "poll-ingest-activity"
11+
CleanUpActivityName = "clean-up-activity"
12+
HidePackageActivityName = "hide-package-activity"
13+
DeleteOriginalActivityName = "delete-original-activity"
14+
ValidateTransferActivityName = "validate-transfer-activity"
1415
)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package activities
2+
3+
import (
4+
"context"
5+
6+
"github.com/artefactual-labs/enduro/internal/validation"
7+
)
8+
9+
type ValidateTransferActivity struct{}
10+
11+
func NewValidateTransferActivity() *ValidateTransferActivity {
12+
return &ValidateTransferActivity{}
13+
}
14+
15+
type ValidateTransferActivityParams struct {
16+
Config validation.Config
17+
Path string
18+
}
19+
20+
func (a *ValidateTransferActivity) Execute(ctx context.Context, params *ValidateTransferActivityParams) error {
21+
return validation.ValidateTransfer(params.Config, params.Path)
22+
}

internal/workflow/processing.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/artefactual-labs/enduro/internal/nha"
1616
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
1717
"github.com/artefactual-labs/enduro/internal/pipeline"
18+
"github.com/artefactual-labs/enduro/internal/validation"
1819
"github.com/artefactual-labs/enduro/internal/watcher"
1920
"github.com/artefactual-labs/enduro/internal/workflow/activities"
2021
"github.com/artefactual-labs/enduro/internal/workflow/manager"
@@ -208,7 +209,7 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
208209
return fmt.Errorf("error creating session: %v", err)
209210
}
210211

211-
sessErr = w.SessionHandler(sessCtx, attempt, tinfo, nameInfo)
212+
sessErr = w.SessionHandler(sessCtx, attempt, tinfo, nameInfo, req.ValidationConfig)
212213

213214
// We want to retry the session if it has been canceled as a result
214215
// of losing the worker but not otherwise. This scenario seems to be
@@ -283,7 +284,7 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
283284
}
284285

285286
// SessionHandler runs activities that belong to the same session.
286-
func (w *ProcessingWorkflow) SessionHandler(sessCtx workflow.Context, attempt int, tinfo *TransferInfo, nameInfo nha.NameInfo) error {
287+
func (w *ProcessingWorkflow) SessionHandler(sessCtx workflow.Context, attempt int, tinfo *TransferInfo, nameInfo nha.NameInfo, validationConfig validation.Config) error {
287288
defer func() {
288289
_ = releasePipeline(sessCtx, w.manager, tinfo.Event.PipelineName)
289290
workflow.CompleteSession(sessCtx)
@@ -328,6 +329,23 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx workflow.Context, attempt in
328329
}
329330
}
330331

332+
// Validate transfer.
333+
{
334+
if validationConfig.IsEnabled() && tinfo.Bundle != (activities.BundleActivityResult{}) {
335+
activityOpts := workflow.WithActivityOptions(sessCtx, workflow.ActivityOptions{
336+
ScheduleToStartTimeout: forever,
337+
StartToCloseTimeout: time.Minute * 5,
338+
})
339+
err := workflow.ExecuteActivity(activityOpts, activities.ValidateTransferActivityName, &activities.ValidateTransferActivityParams{
340+
Config: validationConfig,
341+
Path: tinfo.Bundle.FullPath,
342+
}).Get(activityOpts, nil)
343+
if err != nil {
344+
return err
345+
}
346+
}
347+
}
348+
331349
// Transfer.
332350
{
333351
if tinfo.TransferID == "" {

main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/artefactual-labs/enduro/internal/db"
2020
nha_activities "github.com/artefactual-labs/enduro/internal/nha/activities"
2121
"github.com/artefactual-labs/enduro/internal/pipeline"
22+
"github.com/artefactual-labs/enduro/internal/validation"
2223
"github.com/artefactual-labs/enduro/internal/watcher"
2324
"github.com/artefactual-labs/enduro/internal/workflow"
2425
"github.com/artefactual-labs/enduro/internal/workflow/activities"
@@ -191,7 +192,7 @@ func main() {
191192
}
192193
logger.V(1).Info("Starting new workflow", "watcher", event.WatcherName, "bucket", event.Bucket, "key", event.Key)
193194
go func() {
194-
if err := collection.InitProcessingWorkflow(ctx, workflowClient, event); err != nil {
195+
if err := collection.InitProcessingWorkflow(ctx, workflowClient, event, config.Validation); err != nil {
195196
logger.Error(err, "Error initializing processing workflow.")
196197
}
197198
}()
@@ -217,6 +218,7 @@ func main() {
217218
cadence.RegisterActivity(activities.NewAcquirePipelineActivity(m).Execute, activities.AcquirePipelineActivityName)
218219
cadence.RegisterActivity(activities.NewDownloadActivity(m).Execute, activities.DownloadActivityName)
219220
cadence.RegisterActivity(activities.NewBundleActivity().Execute, activities.BundleActivityName)
221+
cadence.RegisterActivity(activities.NewValidateTransferActivity().Execute, activities.ValidateTransferActivityName)
220222
cadence.RegisterActivity(activities.NewTransferActivity(m).Execute, activities.TransferActivityName)
221223
cadence.RegisterActivity(activities.NewPollTransferActivity(m).Execute, activities.PollTransferActivityName)
222224
cadence.RegisterActivity(activities.NewPollIngestActivity(m).Execute, activities.PollIngestActivityName)
@@ -326,6 +328,7 @@ type configuration struct {
326328
Cadence cadence.Config
327329
Watcher watcher.Config
328330
Pipeline []pipeline.Config
331+
Validation validation.Config
329332

330333
// This is a workaround for client-specific functionality.
331334
// Simple mechanism to support an arbitrary number of hooks and parameters.

0 commit comments

Comments
 (0)