Skip to content

Commit b18c703

Browse files
authored
Add one Task queue and worker per AM Pipeline (#633)
1 parent b8814f0 commit b18c703

File tree

9 files changed

+142
-101
lines changed

9 files changed

+142
-101
lines changed

internal/batch/service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ func (s *batchImpl) InitProcessingWorkflow(ctx context.Context, req *collection.
143143
req.ValidationConfig = validation.Config{}
144144
// req.MetadataConfig = metadata.Config{}
145145
tr := noop.NewTracerProvider().Tracer("")
146-
err := collection.InitProcessingWorkflow(ctx, tr, s.cc, s.taskQueue, req)
146+
147+
err := collection.InitProcessingWorkflow(ctx, tr, s.cc, req)
147148
if err != nil {
148149
s.logger.Error(err, "Error initializing processing workflow.")
149150
}

internal/batch/workflow.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ func NewBatchActivity(batchsvc Service) *BatchActivity {
6060
}
6161

6262
func (a *BatchActivity) Execute(ctx context.Context, params BatchWorkflowInput) error {
63-
pipelines := []string{}
64-
if params.PipelineName != "" {
65-
pipelines = append(pipelines, params.PipelineName)
66-
}
67-
6863
if params.Depth < 0 {
6964
params.Depth = 0
7065
}
@@ -97,7 +92,7 @@ func (a *BatchActivity) Execute(ctx context.Context, params BatchWorkflowInput)
9792
BatchDir: filepath.Dir(path),
9893
Key: entry.Name(),
9994
IsDir: entry.IsDir(),
100-
PipelineNames: pipelines,
95+
PipelineName: params.PipelineName,
10196
ProcessingConfig: params.ProcessingConfig,
10297
CompletedDir: params.CompletedDir,
10398
RetentionPeriod: params.RetentionPeriod,

internal/batch/workflow_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ func TestBatchActivityStartsProcessingWorkflows(t *testing.T) {
3737
BatchDir: batchPath,
3838
Key: "transfer1",
3939
IsDir: true,
40-
PipelineNames: []string{"am"},
40+
PipelineName: "am",
4141
ProcessingConfig: "automated",
4242
})
4343
serviceMock.EXPECT().InitProcessingWorkflow(ctx, &collection.ProcessingWorkflowRequest{
4444
BatchDir: batchPath,
4545
Key: "transfer2",
4646
IsDir: true,
47-
PipelineNames: []string{"am"},
47+
PipelineName: "am",
4848
ProcessingConfig: "automated",
4949
})
5050

internal/collection/goa.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (w *goaWrapper) Retry(ctx context.Context, payload *goacollection.RetryPayl
267267
req.WorkflowID = *goacol.WorkflowID
268268
req.CollectionID = goacol.ID
269269
tr := noop.NewTracerProvider().Tracer("")
270-
if err := InitProcessingWorkflow(ctx, tr, w.cc, w.taskQueue, req); err != nil {
270+
if err := InitProcessingWorkflow(ctx, tr, w.cc, req); err != nil {
271271
return fmt.Errorf("error starting the new workflow instance: %w", err)
272272
}
273273

internal/collection/workflow.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
temporalsdk_client "go.temporal.io/sdk/client"
1313

1414
"github.com/artefactual-labs/enduro/internal/metadata"
15+
"github.com/artefactual-labs/enduro/internal/pipeline"
1516
"github.com/artefactual-labs/enduro/internal/validation"
1617
)
1718

@@ -28,10 +29,7 @@ type ProcessingWorkflowRequest struct {
2829
// Name of the watcher that received this blob.
2930
WatcherName string
3031

31-
// Pipelines that are available for processing. The workflow will choose one
32-
// (randomly picked for now). If the slice is empty, it will be
33-
// automatically populated from the list of all configured pipelines.
34-
PipelineNames []string
32+
PipelineName string
3533

3634
// Period of time to schedule the deletion of the original blob from the
3735
// watched data source. nil means no deletion.
@@ -72,7 +70,7 @@ type ProcessingWorkflowRequest struct {
7270
MetadataConfig metadata.Config
7371
}
7472

75-
func InitProcessingWorkflow(ctx context.Context, tr trace.Tracer, c temporalsdk_client.Client, taskQueue string, req *ProcessingWorkflowRequest) error {
73+
func InitProcessingWorkflow(ctx context.Context, tr trace.Tracer, c temporalsdk_client.Client, req *ProcessingWorkflowRequest) error {
7674
_, span := tr.Start(ctx, "InitProcessingWorkflow")
7775
defer span.End()
7876

@@ -89,7 +87,7 @@ func InitProcessingWorkflow(ctx context.Context, tr trace.Tracer, c temporalsdk_
8987

9088
opts := temporalsdk_client.StartWorkflowOptions{
9189
ID: req.WorkflowID,
92-
TaskQueue: taskQueue,
90+
TaskQueue: pipeline.TaskQueueName(req.PipelineName),
9391
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
9492
}
9593
_, err := c.ExecuteWorkflow(ctx, opts, ProcessingWorkflowName, req)

internal/pipeline/pipeline.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,32 +57,38 @@ type Pipeline struct {
5757
status string
5858
statusUpdatedAt time.Time
5959
statusLock sync.RWMutex
60+
TaskQueue string
6061
}
6162

6263
func NewPipeline(logger logr.Logger, config Config) (*Pipeline, error) {
6364
config.TransferDir = expandPath(config.TransferDir)
6465
config.ProcessingDir = expandPath(config.ProcessingDir)
6566

6667
p := &Pipeline{
67-
logger: logger,
68-
sem: semaphore.NewWeighted(int64(config.Capacity)),
69-
config: &config,
70-
client: httpClient(),
68+
logger: logger,
69+
sem: semaphore.NewWeighted(int64(config.Capacity)),
70+
config: &config,
71+
client: httpClient(),
72+
TaskQueue: TaskQueueName(config.Name),
7173
}
7274

7375
if config.ID != "" {
7476
p.ID = config.ID
7577
}
7678

77-
// init() enriches our record by retrieving the UUID but we still return
78-
// the the object in case of errors.
79+
// init() enriches our record by retrieving the UUID, but we still return
80+
// the object in case of errors.
7981
if err := p.init(); err != nil {
8082
return p, err
8183
}
8284

8385
return p, nil
8486
}
8587

88+
func TaskQueueName(name string) string {
89+
return fmt.Sprintf("pipeline-%s", name)
90+
}
91+
8692
// init connects with the pipeline to retrieve its identifier, unless one has
8793
// already been defined via configuration.
8894
func (p *Pipeline) init() error {

internal/workflow/processing.go

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -286,27 +286,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll
286286
}
287287
}
288288

289-
// Randomly choose the pipeline from the list of names provided. If the
290-
// list is empty then choose one from the list of all configured pipelines.
291-
{
292-
var pick string
293-
if err := temporalsdk_workflow.SideEffect(ctx, func(ctx temporalsdk_workflow.Context) interface{} {
294-
names := req.PipelineNames
295-
if len(names) < 1 {
296-
names = w.pipelineRegistry.Names()
297-
if len(names) < 1 {
298-
return ""
299-
}
300-
}
301-
src := rand.NewSource(time.Now().UnixNano())
302-
rnd := rand.New(src) // #nosec G404 -- not security sensitive.
303-
return names[rnd.Intn(len(names))]
304-
}).Get(&pick); err != nil {
305-
return err
306-
}
307-
308-
tinfo.PipelineName = pick
309-
}
289+
tinfo.PipelineName = req.PipelineName
310290

311291
// Load pipeline configuration and hooks.
312292
{
@@ -706,3 +686,18 @@ func (w *ProcessingWorkflow) transfer(sessCtx temporalsdk_workflow.Context, tinf
706686

707687
return nil
708688
}
689+
690+
// RandomPipeline will randomly choose the pipeline from the list of names provided. If the
691+
// list is empty then choose one from the list of all configured pipelines.
692+
func RandomPipeline(pipelineNames []string, registry *pipeline.Registry) string {
693+
names := pipelineNames
694+
if len(names) < 1 {
695+
names = registry.Names()
696+
if len(names) < 1 {
697+
return ""
698+
}
699+
}
700+
src := rand.NewSource(time.Now().UnixNano())
701+
rnd := rand.New(src) // #nosec G404 -- not security sensitive.
702+
return names[rnd.Intn(len(names))]
703+
}

internal/workflow/processing_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (s *ProcessingWorkflowTestSuite) TestParseErrorIsIgnored() {
7171
s.env.ExecuteWorkflow(s.workflow.Execute, &collection.ProcessingWorkflowRequest{
7272
CollectionID: 0,
7373
WatcherName: "watcher",
74-
PipelineNames: []string{"pipeline"},
74+
PipelineName: "pipeline",
7575
RetentionPeriod: &retentionPeriod,
7676
StripTopLevelDir: true,
7777
Key: "key",
@@ -107,7 +107,7 @@ func (s *ProcessingWorkflowTestSuite) TestParseError() {
107107
s.env.ExecuteWorkflow(s.workflow.Execute, &collection.ProcessingWorkflowRequest{
108108
CollectionID: 0,
109109
WatcherName: "watcher",
110-
PipelineNames: []string{"pipeline"},
110+
PipelineName: "pipeline",
111111
RetentionPeriod: &retentionPeriod,
112112
StripTopLevelDir: true,
113113
Key: "key",

main.go

Lines changed: 102 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,18 @@ func main() {
218218
attribute.String("key", event.Key),
219219
attribute.Bool("dir", event.IsDir),
220220
)
221-
logger.V(1).Info("Starting new workflow", "watcher", event.WatcherName, "bucket", event.Bucket, "key", event.Key, "dir", event.IsDir)
221+
pipelineName := workflow.RandomPipeline(event.PipelineName, pipelineRegistry)
222+
logger.V(1).Info(
223+
"Starting new workflow",
224+
"watcher", event.WatcherName,
225+
"bucket", event.Bucket,
226+
"key", event.Key,
227+
"dir", event.IsDir,
228+
"pipeline", pipelineName,
229+
)
222230
req := collection.ProcessingWorkflowRequest{
223231
WatcherName: event.WatcherName,
224-
PipelineNames: event.PipelineName,
232+
PipelineName: pipelineName,
225233
RetentionPeriod: event.RetentionPeriod,
226234
CompletedDir: event.CompletedDir,
227235
StripTopLevelDir: event.StripTopLevelDir,
@@ -233,7 +241,8 @@ func main() {
233241
ValidationConfig: config.Validation,
234242
MetadataConfig: config.Metadata,
235243
}
236-
if err := collection.InitProcessingWorkflow(ctx, tracer, temporalClient, config.Temporal.TaskQueue, &req); err != nil {
244+
245+
if err := collection.InitProcessingWorkflow(ctx, tracer, temporalClient, &req); err != nil {
237246
logger.Error(err, "Error initializing processing workflow.")
238247
}
239248
span.End()
@@ -247,62 +256,36 @@ func main() {
247256
}
248257
}
249258

250-
// Workflow and activity worker.
251-
{
252-
h := hooks.NewHooks(config.Hooks)
253-
254-
done := make(chan struct{})
255-
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
256-
EnableSessionWorker: true,
257-
MaxConcurrentSessionExecutionSize: config.Worker.MaxConcurrentSessionExecutionSize,
258-
MaxConcurrentWorkflowTaskExecutionSize: config.Worker.MaxConcurrentWorkflowsExecutionsSize,
259-
MaxHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval,
260-
DefaultHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval,
261-
})
262-
if err != nil {
263-
logger.Error(err, "Error creating Temporal worker.")
264-
os.Exit(1)
259+
for _, p := range pipelineRegistry.List() {
260+
if p.Status(ctx) == "active" {
261+
logger.Info("Creating worker and task queue", "Pipeline", p.TaskQueue)
262+
} else {
263+
logger.Error(errors.New("Pipeline not active"), "Cannot create worker and task queue for inactive pipeline")
264+
continue
265265
}
266-
267-
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(h, colsvc, pipelineRegistry, logger, config.Workflow).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
268-
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
269-
w.RegisterActivityWithOptions(activities.NewDownloadActivity(h, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
270-
w.RegisterActivityWithOptions(archive.NewExtractActivity(config.ExtractActivity).Execute, temporalsdk_activity.RegisterOptions{Name: archive.ExtractActivityName})
271-
w.RegisterActivityWithOptions(activities.NewBundleActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
272-
w.RegisterActivityWithOptions(activities.NewValidateTransferActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ValidateTransferActivityName})
273-
w.RegisterActivityWithOptions(activities.NewTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName})
274-
w.RegisterActivityWithOptions(activities.NewPollTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollTransferActivityName})
275-
w.RegisterActivityWithOptions(activities.NewPollIngestActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollIngestActivityName})
276-
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
277-
w.RegisterActivityWithOptions(activities.NewHidePackageActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName})
278-
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
279-
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})
280-
w.RegisterActivityWithOptions(activities.NewPopulateMetadataActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PopulateMetadataActivityName})
281-
282-
w.RegisterActivityWithOptions(workflow.NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: workflow.AsyncCompletionActivityName})
283-
w.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(h).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})
284-
w.RegisterActivityWithOptions(nha_activities.NewUpdateProductionSystemActivity(h).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateProductionSystemActivityName})
285-
286-
w.RegisterWorkflowWithOptions(collection.BulkWorkflow, temporalsdk_workflow.RegisterOptions{Name: collection.BulkWorkflowName})
287-
w.RegisterActivityWithOptions(collection.NewBulkActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: collection.BulkActivityName})
288-
289-
w.RegisterWorkflowWithOptions(batch.BatchWorkflow, temporalsdk_workflow.RegisterOptions{Name: batch.BatchWorkflowName})
290-
w.RegisterActivityWithOptions(batch.NewBatchActivity(batchsvc).Execute, temporalsdk_activity.RegisterOptions{Name: batch.BatchActivityName})
291-
292-
g.Add(
293-
func() error {
294-
if err := w.Start(); err != nil {
295-
return err
296-
}
297-
<-done
298-
return nil
299-
},
300-
func(err error) {
301-
w.Stop()
302-
close(done)
303-
},
266+
registerWorker(
267+
p.TaskQueue,
268+
temporalClient,
269+
pipelineRegistry,
270+
config,
271+
colsvc,
272+
wsvc,
273+
batchsvc,
274+
logger,
275+
&g,
304276
)
305277
}
278+
registerWorker(
279+
config.Temporal.TaskQueue,
280+
temporalClient,
281+
pipelineRegistry,
282+
config,
283+
colsvc,
284+
wsvc,
285+
batchsvc,
286+
logger,
287+
&g,
288+
)
306289

307290
// Observability server.
308291
{
@@ -506,3 +489,66 @@ func initTracerProvider(ctx context.Context, logger logr.Logger, cfg TelemetryCo
506489

507490
return tp, shutdown, nil
508491
}
492+
493+
func registerWorker(
494+
taskQueue string,
495+
temporalClient temporalsdk_client.Client,
496+
pipelineRegistry *pipeline.Registry,
497+
config configuration,
498+
colsvc collection.Service,
499+
wsvc watcher.Service,
500+
batchsvc batch.Service,
501+
logger logr.Logger,
502+
g *run.Group,
503+
) {
504+
// Workflow and activity worker.
505+
h := hooks.NewHooks(config.Hooks)
506+
507+
done := make(chan struct{})
508+
w := temporalsdk_worker.New(temporalClient, taskQueue, temporalsdk_worker.Options{
509+
EnableSessionWorker: true,
510+
MaxConcurrentSessionExecutionSize: config.Worker.MaxConcurrentSessionExecutionSize,
511+
MaxConcurrentWorkflowTaskExecutionSize: config.Worker.MaxConcurrentWorkflowsExecutionsSize,
512+
MaxHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval,
513+
DefaultHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval,
514+
})
515+
516+
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(h, colsvc, pipelineRegistry, logger, config.Workflow).Execute, temporalsdk_workflow.RegisterOptions{Name: collection.ProcessingWorkflowName})
517+
w.RegisterActivityWithOptions(activities.NewAcquirePipelineActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.AcquirePipelineActivityName})
518+
w.RegisterActivityWithOptions(activities.NewDownloadActivity(h, pipelineRegistry, wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName})
519+
w.RegisterActivityWithOptions(archive.NewExtractActivity(config.ExtractActivity).Execute, temporalsdk_activity.RegisterOptions{Name: archive.ExtractActivityName})
520+
w.RegisterActivityWithOptions(activities.NewBundleActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName})
521+
w.RegisterActivityWithOptions(activities.NewValidateTransferActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ValidateTransferActivityName})
522+
w.RegisterActivityWithOptions(activities.NewTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.TransferActivityName})
523+
w.RegisterActivityWithOptions(activities.NewPollTransferActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollTransferActivityName})
524+
w.RegisterActivityWithOptions(activities.NewPollIngestActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollIngestActivityName})
525+
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})
526+
w.RegisterActivityWithOptions(activities.NewHidePackageActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.HidePackageActivityName})
527+
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
528+
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})
529+
w.RegisterActivityWithOptions(activities.NewPopulateMetadataActivity(pipelineRegistry).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PopulateMetadataActivityName})
530+
531+
w.RegisterActivityWithOptions(workflow.NewAsyncCompletionActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: workflow.AsyncCompletionActivityName})
532+
w.RegisterActivityWithOptions(nha_activities.NewUpdateHARIActivity(h).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateHARIActivityName})
533+
w.RegisterActivityWithOptions(nha_activities.NewUpdateProductionSystemActivity(h).Execute, temporalsdk_activity.RegisterOptions{Name: nha_activities.UpdateProductionSystemActivityName})
534+
535+
w.RegisterWorkflowWithOptions(collection.BulkWorkflow, temporalsdk_workflow.RegisterOptions{Name: collection.BulkWorkflowName})
536+
w.RegisterActivityWithOptions(collection.NewBulkActivity(colsvc).Execute, temporalsdk_activity.RegisterOptions{Name: collection.BulkActivityName})
537+
538+
w.RegisterWorkflowWithOptions(batch.BatchWorkflow, temporalsdk_workflow.RegisterOptions{Name: batch.BatchWorkflowName})
539+
w.RegisterActivityWithOptions(batch.NewBatchActivity(batchsvc).Execute, temporalsdk_activity.RegisterOptions{Name: batch.BatchActivityName})
540+
541+
g.Add(
542+
func() error {
543+
if err := w.Start(); err != nil {
544+
return err
545+
}
546+
<-done
547+
return nil
548+
},
549+
func(err error) {
550+
w.Stop()
551+
close(done)
552+
},
553+
)
554+
}

0 commit comments

Comments
 (0)