Skip to content

Commit 64e36c9

Browse files
committed
workflow: make retentionPeriod optional
With this commit, we support three different scenarios: * `retentionPeriod` is undefined - original never removed, * `retentionPeriod` is zero - original is removed immediately, * `retentionPeriod` is >0 (e.g. "1s") - timer kicks in. This fixes #10.
1 parent 416f3fe commit 64e36c9

File tree

4 files changed

+8
-8
lines changed

4 files changed

+8
-8
lines changed

internal/watcher/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type FilesystemConfig struct {
1414
Inotify bool
1515

1616
Pipeline string
17-
RetentionPeriod time.Duration
17+
RetentionPeriod *time.Duration
1818
}
1919

2020
// See minio.go for more.
@@ -31,5 +31,5 @@ type MinioConfig struct {
3131
Token string
3232

3333
Pipeline string
34-
RetentionPeriod time.Duration
34+
RetentionPeriod *time.Duration
3535
}

internal/watcher/event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type BlobEvent struct {
2323
PipelineName string
2424

2525
// Retention period for this blob.
26-
RetentionPeriod time.Duration
26+
RetentionPeriod *time.Duration
2727

2828
// Key of the blob.
2929
Key string

internal/watcher/watcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ type Watcher interface {
2323
// Every watcher targets a pipeline.
2424
Pipeline() string
2525

26-
RetentionPeriod() time.Duration
26+
RetentionPeriod() *time.Duration
2727

2828
fmt.Stringer // It should return the name of the watcher.
2929
}
3030

3131
type commonWatcherImpl struct {
3232
name string
3333
pipeline string
34-
retentionPeriod time.Duration
34+
retentionPeriod *time.Duration
3535
}
3636

3737
func (w *commonWatcherImpl) String() string {
@@ -42,7 +42,7 @@ func (w *commonWatcherImpl) Pipeline() string {
4242
return w.pipeline
4343
}
4444

45-
func (w *commonWatcherImpl) RetentionPeriod() time.Duration {
45+
func (w *commonWatcherImpl) RetentionPeriod() *time.Duration {
4646
return w.retentionPeriod
4747
}
4848

internal/workflow/processing.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ func (w *ProcessingWorkflow) Execute(ctx workflow.Context, req *collection.Proce
176176

177177
// Schedule deletion of the original in the watched data source.
178178
var deletionTimer workflow.Future
179-
if tinfo.Status == collection.StatusDone {
180-
deletionTimer = workflow.NewTimer(ctx, tinfo.Event.RetentionPeriod)
179+
if tinfo.Status == collection.StatusDone && tinfo.Event.RetentionPeriod != nil {
180+
deletionTimer = workflow.NewTimer(ctx, *tinfo.Event.RetentionPeriod)
181181
}
182182

183183
// Activities that we want to run within the session regardless the

0 commit comments

Comments
 (0)