Skip to content

Commit 7717b7b

Browse files
Merge pull request #149 from kaleido-io/fswatcher-reconcile
[fswatcher] File Reconciler w/ Resync Interval
2 parents bc46928 + 16921a8 commit 7717b7b

File tree

2 files changed

+47
-10
lines changed

2 files changed

+47
-10
lines changed

pkg/fswatcher/fswatcher.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2023 Kaleido, Inc.
1+
// Copyright © 2024 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"os"
2222
"path"
23+
"time"
2324

2425
"github.com/fsnotify/fsnotify"
2526
"github.com/hyperledger/firefly-common/pkg/fftypes"
@@ -35,9 +36,19 @@ import (
3536
// - Only fires if the data in the file is different to the last notification
3637
// - Does not reload the config - that's the caller's responsibility
3738
func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) error {
39+
return sync(ctx, fullFilePath, onChange, onClose, nil, nil)
40+
}
41+
42+
// Reconcile behaves the same as Watch, except it allows for running the onSync func on a provided
43+
// interval. The default re-sync internal is 1m.
44+
func Reconcile(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration) error {
45+
return sync(ctx, fullFilePath, onChange, onClose, onSync, resyncInterval)
46+
}
47+
48+
func sync(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration) error {
3849
filePath := path.Dir(fullFilePath)
3950
fileName := path.Base(fullFilePath)
40-
log.L(ctx).Debugf("Starting file listener for '%s' in directory '%s'", fileName, filePath)
51+
log.L(ctx).Debugf("Starting file reconciler for '%s' in directory '%s'", fileName, filePath)
4152

4253
watcher, err := fsnotify.NewWatcher()
4354
if err == nil {
@@ -46,7 +57,7 @@ func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) e
4657
if onClose != nil {
4758
onClose()
4859
}
49-
}, watcher.Events, watcher.Errors)
60+
}, onSync, resyncInterval, watcher.Events, watcher.Errors)
5061
err = watcher.Add(filePath)
5162
}
5263
if err != nil {
@@ -56,9 +67,18 @@ func Watch(ctx context.Context, fullFilePath string, onChange, onClose func()) e
5667
return nil
5768
}
5869

59-
func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose func(), events chan fsnotify.Event, errors chan error) {
70+
func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose, onSync func(), resyncInterval *time.Duration, events chan fsnotify.Event, errors chan error) {
6071
defer onClose()
6172

73+
timeout := resyncInterval
74+
if timeout == nil {
75+
timeout = func() *time.Duration {
76+
defaultTimeout := time.Minute
77+
return &defaultTimeout
78+
}()
79+
}
80+
log.L(ctx).Debugf("re-sync interval set to '%s'", *timeout)
81+
6282
var lastHash *fftypes.Bytes32
6383
for {
6484
select {
@@ -83,6 +103,15 @@ func fsListenerLoop(ctx context.Context, fullFilePath string, onChange, onClose
83103
lastHash = dataHash
84104
}
85105
}
106+
case <-time.After(*timeout):
107+
if onSync != nil {
108+
data, err := os.ReadFile(fullFilePath)
109+
if err == nil {
110+
dataHash := fftypes.HashString(string(data))
111+
log.L(ctx).Infof("Config file re-sync. Event=Resync Name=%s Size=%d Hash=%s", fullFilePath, len(data), dataHash)
112+
onSync()
113+
}
114+
}
86115
case err, ok := <-errors:
87116
if ok {
88117
log.L(ctx).Errorf("FSEvent error: %s", err)

pkg/fswatcher/fswatcher_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2022 Kaleido, Inc.
1+
// Copyright © 2024 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -21,14 +21,15 @@ import (
2121
"fmt"
2222
"os"
2323
"testing"
24+
"time"
2425

2526
"github.com/fsnotify/fsnotify"
2627
"github.com/sirupsen/logrus"
2728
"github.com/spf13/viper"
2829
"github.com/stretchr/testify/assert"
2930
)
3031

31-
func TestFileListenerE2E(t *testing.T) {
32+
func TestFileReconcilerE2E(t *testing.T) {
3233

3334
logrus.SetLevel(logrus.DebugLevel)
3435
tmpDir := t.TempDir()
@@ -47,14 +48,20 @@ func TestFileListenerE2E(t *testing.T) {
4748
// Start listener on config file
4849
fsListenerDone := make(chan struct{})
4950
fsListenerFired := make(chan bool)
51+
reSyncFired := make(chan bool)
52+
reSyncInterval := 1 * time.Second
5053
ctx, cancelCtx := context.WithCancel(context.Background())
51-
err := Watch(ctx, filePath, func() {
54+
err := Reconcile(ctx, filePath, func() {
5255
err := viper.ReadInConfig()
5356
assert.NoError(t, err)
5457
fsListenerFired <- true
5558
}, func() {
5659
close(fsListenerDone)
57-
})
60+
}, func() {
61+
err := viper.ReadInConfig()
62+
assert.NoError(t, err)
63+
reSyncFired <- true
64+
}, &reSyncInterval)
5865
assert.NoError(t, err)
5966

6067
// Delete and rename in another file
@@ -63,6 +70,7 @@ func TestFileListenerE2E(t *testing.T) {
6370
os.Rename(fmt.Sprintf("%s/another.yaml", tmpDir), fmt.Sprintf("%s/test.yaml", tmpDir))
6471
<-fsListenerFired
6572
assert.Equal(t, "two", viper.Get("ut_conf"))
73+
<-reSyncFired
6674

6775
defer func() {
6876
cancelCtx()
@@ -74,7 +82,7 @@ func TestFileListenerE2E(t *testing.T) {
7482

7583
}
7684

77-
func TestFileListenerFail(t *testing.T) {
85+
func TestFileWatcherFail(t *testing.T) {
7886

7987
logrus.SetLevel(logrus.DebugLevel)
8088
tmpDir := t.TempDir()
@@ -95,7 +103,7 @@ func TestFileListenerLogError(t *testing.T) {
95103
defer cancelCtx()
96104
errors := make(chan error)
97105
fsListenerDone := make(chan struct{})
98-
go fsListenerLoop(ctx, "somefile", func() {}, func() { close(fsListenerDone) }, make(chan fsnotify.Event), errors)
106+
go fsListenerLoop(ctx, "somefile", func() {}, func() { close(fsListenerDone) }, nil, nil, make(chan fsnotify.Event), errors)
99107

100108
errors <- fmt.Errorf("pop")
101109
cancelCtx()

0 commit comments

Comments
 (0)