Skip to content

Commit b1ac85b

Browse files
committed
configurate the vault srorage Postprocessing
1 parent 1d4f542 commit b1ac85b

3 files changed

Lines changed: 22 additions & 3 deletions

File tree

pkg/events/postprocessing.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type PostprocessingStepFinished struct {
103103
UploadID string
104104
ExecutingUser *user.User
105105
Filename string
106+
ResourceID *provider.ResourceId
106107

107108
FinishedStep Postprocessingstep // name of the step
108109
Result interface{} // result information see VirusscanResult for example
@@ -145,6 +146,7 @@ type VirusscanResult struct {
145146
type PostprocessingFinished struct {
146147
UploadID string
147148
Filename string
149+
ResourceID *provider.ResourceId
148150
SpaceOwner *user.UserId
149151
ExecutingUser *user.User
150152
Result map[Postprocessingstep]interface{} // it is a map[step]Event

pkg/storage/utils/decomposedfs/decomposedfs.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func New(o *options.Options, aspects aspects.Aspects, log *zerolog.Logger) (stor
258258
return nil, errors.New("need nats for async file processing")
259259
}
260260

261-
ch, err := events.Consume(fs.stream, "dcfs", _registeredEvents...)
261+
ch, err := events.Consume(fs.stream, o.Events.ConsumerGroup, _registeredEvents...)
262262
if err != nil {
263263
return nil, err
264264
}
@@ -285,6 +285,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
285285
switch ev := event.Event.(type) {
286286
case events.PostprocessingFinished:
287287
sublog := log.With().Str("event", "PostprocessingFinished").Str("uploadid", ev.UploadID).Logger()
288+
if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID {
289+
sublog.Debug().Msg("ignoring event for different storage")
290+
continue
291+
}
288292
session, err := fs.sessionStore.Get(ctx, ev.UploadID)
289293
if err != nil {
290294
sublog.Error().Err(err).Msg("Failed to get upload")
@@ -450,6 +454,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
450454
session.Cleanup(true, !ev.KeepUpload, !ev.KeepUpload, true)
451455
case events.RevertRevision:
452456
sublog := log.With().Str("event", "RevertRevision").Interface("nodeid", ev.ResourceID).Logger()
457+
if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID {
458+
sublog.Debug().Msg("ignoring event for different storage")
459+
continue
460+
}
453461
n, err := fs.lu.NodeFromID(ctx, ev.ResourceID)
454462
if err != nil {
455463
sublog.Error().Err(err).Msg("Failed to get node")
@@ -462,6 +470,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
462470
}
463471
case events.PostprocessingStepFinished:
464472
sublog := log.With().Str("event", "PostprocessingStepFinished").Str("uploadid", ev.UploadID).Logger()
473+
if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID {
474+
sublog.Debug().Msg("ignoring event for different storage")
475+
continue
476+
}
465477
if ev.FinishedStep != events.PPStepAntivirus {
466478
// atm we are only interested in antivirus results
467479
continue

pkg/storage/utils/decomposedfs/options/options.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import (
2323
"strings"
2424
"time"
2525

26+
"github.com/mitchellh/mapstructure"
2627
"github.com/owncloud/reva/v2/pkg/rgrpc/todo/pool"
2728
"github.com/owncloud/reva/v2/pkg/sharedconf"
2829
"github.com/owncloud/reva/v2/pkg/storage/cache"
29-
"github.com/mitchellh/mapstructure"
3030
"github.com/pkg/errors"
3131
)
3232

@@ -103,7 +103,8 @@ type AsyncPropagatorOptions struct {
103103

104104
// EventOptions are the configurable options for events
105105
type EventOptions struct {
106-
NumConsumers int `mapstructure:"numconsumers"`
106+
NumConsumers int `mapstructure:"numconsumers"`
107+
ConsumerGroup string `mapstructure:"consumer_group"`
107108
}
108109

109110
// TokenOptions are the configurable option for tokens
@@ -172,5 +173,9 @@ func New(m map[string]interface{}) (*Options, error) {
172173
o.UploadDirectory = filepath.Join(o.Root, "uploads")
173174
}
174175

176+
if o.Events.ConsumerGroup == "" {
177+
o.Events.ConsumerGroup = "dcfs"
178+
}
179+
175180
return o, nil
176181
}

0 commit comments

Comments
 (0)