Skip to content

Commit c08adc0

Browse files
committed
pipeline: create pipeline type
1 parent 1d5a785 commit c08adc0

File tree

9 files changed

+136
-83
lines changed

9 files changed

+136
-83
lines changed

internal/pipeline/pipeline.go

Lines changed: 32 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package pipeline
22

33
import (
4-
"errors"
5-
"fmt"
64
"io/ioutil"
75
"net"
86
"net/http"
@@ -15,8 +13,6 @@ import (
1513
"github.com/artefactual-labs/enduro/internal/amclient"
1614
)
1715

18-
// TODO: PipelineRegistry should return Pipeline objects!
19-
2016
type Config struct {
2117
Name string
2218
BaseURL string
@@ -28,68 +24,36 @@ type Config struct {
2824
ProcessingConfig string
2925
}
3026

31-
// PipelineRegistry is a collection of known pipelines.
32-
type PipelineRegistry struct {
33-
pipelines map[string]Config
27+
type Pipeline struct {
28+
config *Config
29+
client *http.Client
3430
}
3531

36-
func NewPipelineRegistry(configs []Config) *PipelineRegistry {
37-
pipelines := map[string]Config{}
38-
for _, cfg := range configs {
39-
cfg.TransferDir = expandPath(cfg.TransferDir)
40-
pipelines[cfg.Name] = cfg
32+
func NewPipeline(config *Config) *Pipeline {
33+
config.TransferDir = expandPath(config.TransferDir)
34+
config.ProcessingDir = expandPath(config.ProcessingDir)
4135

42-
}
43-
return &PipelineRegistry{
44-
pipelines: pipelines,
36+
return &Pipeline{
37+
config: config,
38+
client: httpClient(),
4539
}
4640
}
4741

48-
func (p PipelineRegistry) Config(name string) (*Config, error) {
49-
cfg, ok := p.pipelines[name]
50-
if !ok {
51-
return nil, errors.New("client not found")
52-
}
53-
54-
return &cfg, nil
42+
// Client returns the Archivematica API client ready for use.
43+
func (p Pipeline) Client() *amclient.Client {
44+
return amclient.NewClient(p.client, p.config.BaseURL, p.config.User, p.config.Key)
5545
}
5646

57-
func (p PipelineRegistry) Client(name string) (*amclient.Client, error) {
58-
cfg, err := p.Config(name)
59-
if err != nil {
60-
return nil, fmt.Errorf("error fetching pipeline configuration: %w", err)
47+
// TempFile creates a temporary file in the processing directory.
48+
func (p Pipeline) TempFile(pattern string) (*os.File, error) {
49+
if pattern == "" {
50+
pattern = "blob-*"
6151
}
62-
63-
client, err := amclient.New(httpClient(), cfg.BaseURL, cfg.User, cfg.Key)
64-
if err != nil {
65-
return nil, fmt.Errorf("error creating Archivematica API client: %w", err)
66-
}
67-
68-
return client, nil
52+
return ioutil.TempFile(p.config.ProcessingDir, pattern)
6953
}
7054

71-
// TempFile returns a new temporary file inside the processing directory of the
72-
// given pipeline.
73-
func (p PipelineRegistry) TempFile(name string) (*os.File, error) {
74-
cfg, err := p.Config(name)
75-
if err != nil {
76-
return nil, fmt.Errorf("error fetching pipeline configuration: %w", err)
77-
}
78-
79-
return ioutil.TempFile(cfg.ProcessingDir, "blob-*")
80-
}
81-
82-
func expandPath(path string) string {
83-
usr, _ := user.Current()
84-
dir := usr.HomeDir
85-
86-
if path == "~" {
87-
path = dir
88-
} else if strings.HasPrefix(path, "~/") {
89-
path = filepath.Join(dir, path[2:])
90-
}
91-
92-
return path
55+
func (p Pipeline) Config() *Config {
56+
return p.config
9357
}
9458

9559
func httpClient() *http.Client {
@@ -110,3 +74,16 @@ func httpClient() *http.Client {
11074
Transport: transport,
11175
}
11276
}
77+
78+
func expandPath(path string) string {
79+
usr, _ := user.Current()
80+
dir := usr.HomeDir
81+
82+
if path == "~" {
83+
path = dir
84+
} else if strings.HasPrefix(path, "~/") {
85+
path = filepath.Join(dir, path[2:])
86+
}
87+
88+
return path
89+
}

internal/pipeline/registry.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package pipeline
2+
3+
import (
4+
"errors"
5+
"sync"
6+
)
7+
8+
var ErrUnknownPipeline = errors.New("unknown pipeline")
9+
10+
// Registry is a collection of known pipelines.
11+
type Registry struct {
12+
pipelines map[string]*Pipeline
13+
mu sync.Mutex
14+
}
15+
16+
func NewPipelineRegistry(configs []Config) *Registry {
17+
pipelines := map[string]*Pipeline{}
18+
for _, config := range configs {
19+
pipelines[config.Name] = NewPipeline(&config)
20+
}
21+
return &Registry{
22+
pipelines: pipelines,
23+
}
24+
}
25+
26+
func (r *Registry) Pipeline(name string) (*Pipeline, error) {
27+
r.mu.Lock()
28+
defer r.mu.Unlock()
29+
30+
pipeline, ok := r.pipelines[name]
31+
if !ok {
32+
return nil, ErrUnknownPipeline
33+
}
34+
35+
return pipeline, nil
36+
}

internal/workflow/download_activity.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ func (a *DownloadActivity) Execute(ctx context.Context, event *watcher.BlobEvent
2121
if event == nil {
2222
return "", nonRetryableError(errors.New("error reading parameters"))
2323
}
24+
p, err := a.manager.Pipelines.Pipeline(event.PipelineName)
25+
if err != nil {
26+
return "", nonRetryableError(err)
27+
}
2428

25-
file, err := a.manager.Pipelines.TempFile(event.PipelineName)
29+
file, err := p.TempFile("blob-*")
2630
if err != nil {
2731
return "", nonRetryableError(fmt.Errorf("error creating temporary file in processing directory: %v", err))
2832
}

internal/workflow/hide_package_activity.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ func NewHidePackageActivity(m *Manager) *HidePackageActivity {
1414
}
1515

1616
func (a *HidePackageActivity) Execute(ctx context.Context, unitID, unitType, pipelineName string) error {
17-
amc, err := a.manager.Pipelines.Client(pipelineName)
17+
p, err := a.manager.Pipelines.Pipeline(pipelineName)
1818
if err != nil {
19-
return nonRetryableError(fmt.Errorf("error looking up pipeline config: %v", err))
19+
return nonRetryableError(err)
2020
}
21+
amc := p.Client()
2122

2223
if unitType != "transfer" && unitType != "ingest" {
2324
return nonRetryableError(fmt.Errorf("unexpected unit type: %s", unitType))

internal/workflow/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ type Manager struct {
1515
Logger logr.Logger
1616
Collection collection.Service
1717
Watcher watcher.Service
18-
Pipelines *pipeline.PipelineRegistry
18+
Pipelines *pipeline.Registry
1919
Hooks map[string]map[string]interface{}
2020
}
2121

2222
// NewManager returns a pointer to a new Manager.
23-
func NewManager(logger logr.Logger, colsvc collection.Service, wsvc watcher.Service, pipelines *pipeline.PipelineRegistry, hooks map[string]map[string]interface{}) *Manager {
23+
func NewManager(logger logr.Logger, colsvc collection.Service, wsvc watcher.Service, pipelines *pipeline.Registry, hooks map[string]map[string]interface{}) *Manager {
2424
return &Manager{
2525
Logger: logger,
2626
Collection: colsvc,

internal/workflow/poll_ingest_activity.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@ func NewPollIngestActivity(m *Manager) *PollIngestActivity {
1818
return &PollIngestActivity{manager: m}
1919
}
2020

21-
func (a *PollIngestActivity) Execute(ctx context.Context, tinfo *TransferInfo) (time.Time, error) {
22-
amc, err := a.manager.Pipelines.Client(tinfo.Event.PipelineName)
21+
type PollIngestActivityParams struct {
22+
PipelineName string
23+
SIPID string
24+
}
25+
26+
func (a *PollIngestActivity) Execute(ctx context.Context, params *PollIngestActivityParams) (time.Time, error) {
27+
p, err := a.manager.Pipelines.Pipeline(params.PipelineName)
2328
if err != nil {
24-
return time.Time{}, err
29+
return time.Time{}, nonRetryableError(err)
2530
}
31+
amc := p.Client()
2632

2733
var backoffStrategy = backoff.WithContext(backoff.NewConstantBackOff(time.Second*5), ctx)
2834

@@ -31,7 +37,7 @@ func (a *PollIngestActivity) Execute(ctx context.Context, tinfo *TransferInfo) (
3137
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
3238
defer cancel()
3339

34-
err = pipeline.IngestStatus(ctx, amc, tinfo.SIPID)
40+
err = pipeline.IngestStatus(ctx, amc, params.SIPID)
3541
if errors.Is(err, pipeline.ErrStatusNonRetryable) {
3642
return backoff.Permanent(nonRetryableError(err))
3743
}

internal/workflow/poll_transfer_activity.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@ func NewPollTransferActivity(m *Manager) *PollTransferActivity {
1818
return &PollTransferActivity{manager: m}
1919
}
2020

21-
func (a *PollTransferActivity) Execute(ctx context.Context, tinfo *TransferInfo) (string, error) {
22-
amc, err := a.manager.Pipelines.Client(tinfo.Event.PipelineName)
21+
type PollTransferActivityParams struct {
22+
PipelineName string
23+
TransferID string
24+
}
25+
26+
func (a *PollTransferActivity) Execute(ctx context.Context, params *PollTransferActivityParams) (string, error) {
27+
p, err := a.manager.Pipelines.Pipeline(params.PipelineName)
2328
if err != nil {
2429
return "", err
2530
}
31+
amc := p.Client()
2632

2733
var sipID string
2834
var backoffStrategy = backoff.WithContext(backoff.NewConstantBackOff(time.Second*5), ctx)
@@ -32,7 +38,7 @@ func (a *PollTransferActivity) Execute(ctx context.Context, tinfo *TransferInfo)
3238
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
3339
defer cancel()
3440

35-
sipID, err = pipeline.TransferStatus(ctx, amc, tinfo.TransferID)
41+
sipID, err = pipeline.TransferStatus(ctx, amc, params.TransferID)
3642
if errors.Is(err, pipeline.ErrStatusNonRetryable) {
3743
return backoff.Permanent(nonRetryableError(err))
3844
}

internal/workflow/processing.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,14 @@ func (w *ProcessingWorkflow) SessionHandler(ctx workflow.Context, sessCtx workfl
264264

265265
// Transfer.
266266
activityOpts = withActivityOptsForRequest(sessCtx)
267-
err = workflow.ExecuteActivity(activityOpts, TransferActivityName, tinfo).Get(activityOpts, &tinfo.TransferID)
267+
err = workflow.ExecuteActivity(activityOpts, TransferActivityName, &TransferActivityParams{
268+
PipelineName: tinfo.Event.PipelineName,
269+
TransferLocationID: tinfo.PipelineConfig.TransferLocationID,
270+
RelPath: tinfo.Bundle.RelPath,
271+
Name: tinfo.Bundle.Name,
272+
ProcessingConfig: tinfo.PipelineConfig.ProcessingConfig,
273+
AutoApprove: true,
274+
}).Get(activityOpts, &tinfo.TransferID)
268275
if err != nil {
269276
return err
270277
}
@@ -275,14 +282,20 @@ func (w *ProcessingWorkflow) SessionHandler(ctx workflow.Context, sessCtx workfl
275282

276283
// Poll transfer.
277284
activityOpts = withActivityOptsForHeartbeatedRequest(sessCtx, time.Minute)
278-
err = workflow.ExecuteActivity(activityOpts, PollTransferActivityName, tinfo).Get(activityOpts, &tinfo.SIPID)
285+
err = workflow.ExecuteActivity(activityOpts, PollTransferActivityName, &PollTransferActivityParams{
286+
PipelineName: tinfo.Event.PipelineName,
287+
TransferID: tinfo.TransferID,
288+
}).Get(activityOpts, &tinfo.SIPID)
279289
if err != nil {
280290
return err
281291
}
282292

283293
// Poll ingest.
284294
activityOpts = withActivityOptsForHeartbeatedRequest(sessCtx, time.Minute)
285-
err = workflow.ExecuteActivity(activityOpts, PollIngestActivityName, tinfo).Get(activityOpts, &tinfo.StoredAt)
295+
err = workflow.ExecuteActivity(activityOpts, PollIngestActivityName, &PollIngestActivityParams{
296+
PipelineName: tinfo.Event.PipelineName,
297+
SIPID: tinfo.SIPID,
298+
}).Get(activityOpts, &tinfo.StoredAt)
286299
if err != nil {
287300
return err
288301
}
@@ -327,11 +340,12 @@ func updatePackageStatusLocalActivity(ctx context.Context, colsvc collection.Ser
327340
}
328341

329342
func loadConfigLocalActivity(ctx context.Context, m *Manager, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) {
330-
pipelineConfig, err := m.Pipelines.Config(tinfo.Event.PipelineName)
343+
p, err := m.Pipelines.Pipeline(pipeline)
331344
if err != nil {
332-
return nil, fmt.Errorf("error loading configuration of pipeline %s: %v", pipeline, err)
345+
return nil, err
333346
}
334-
tinfo.PipelineConfig = pipelineConfig
347+
348+
tinfo.PipelineConfig = p.Config()
335349
tinfo.Hooks = m.Hooks
336350

337351
return tinfo, nil

internal/workflow/transfer_activity.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,33 @@ func NewTransferActivity(m *Manager) *TransferActivity {
2020
return &TransferActivity{manager: m}
2121
}
2222

23-
func (a *TransferActivity) Execute(ctx context.Context, tinfo *TransferInfo) (string, error) {
24-
amc, err := a.manager.Pipelines.Client(tinfo.PipelineConfig.Name)
23+
type TransferActivityParams struct {
24+
PipelineName string
25+
TransferLocationID string
26+
RelPath string
27+
Name string
28+
ProcessingConfig string
29+
AutoApprove bool
30+
}
31+
32+
func (a *TransferActivity) Execute(ctx context.Context, params *TransferActivityParams) (string, error) {
33+
p, err := a.manager.Pipelines.Pipeline(params.PipelineName)
2534
if err != nil {
26-
return "", err
35+
return "", nonRetryableError(err)
2736
}
37+
amc := p.Client()
2838

2939
// Transfer path should include the location UUID if defined.
30-
var path = tinfo.Bundle.RelPath
31-
if tinfo.PipelineConfig.TransferLocationID != "" {
32-
path = fmt.Sprintf("%s:%s", tinfo.PipelineConfig.TransferLocationID, path)
40+
var path = params.RelPath
41+
if params.TransferLocationID != "" {
42+
path = fmt.Sprintf("%s:%s", params.TransferLocationID, path)
3343
}
3444

35-
var autoApprove = true
3645
resp, httpResp, err := amc.Package.Create(ctx, &amclient.PackageCreateRequest{
37-
Name: tinfo.Bundle.Name,
46+
Name: params.Name,
3847
Path: path,
39-
ProcessingConfig: tinfo.PipelineConfig.ProcessingConfig,
40-
AutoApprove: &autoApprove,
48+
ProcessingConfig: params.ProcessingConfig,
49+
AutoApprove: &params.AutoApprove,
4150
})
4251
if err != nil {
4352
if httpResp != nil {

0 commit comments

Comments
 (0)