Skip to content

Commit e3553d7

Browse files
committed
Refactor file watcher
1 parent 29b1bed commit e3553d7

File tree

2 files changed

+65
-122
lines changed

2 files changed

+65
-122
lines changed

internal/watcher/file/file_watcher_service.go

Lines changed: 38 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package file
88
import (
99
"context"
1010
"errors"
11-
"fmt"
1211
"log/slog"
1312
"os"
1413
"path/filepath"
@@ -34,13 +33,12 @@ type FileUpdateMessage struct {
3433
}
3534

3635
type FileWatcherService struct {
37-
agentConfig *config.Config
38-
watcher *fsnotify.Watcher
39-
directoriesBeingWatched *sync.Map
40-
filesChanged *atomic.Bool
41-
enabled *atomic.Bool
42-
directoriesThatDontExistYet *sync.Map
43-
mu sync.Mutex
36+
agentConfig *config.Config
37+
watcher *fsnotify.Watcher
38+
filesChanged *atomic.Bool
39+
enabled *atomic.Bool
40+
directoriesToWatch map[string]struct{}
41+
mu sync.Mutex
4442
}
4543

4644
func NewFileWatcherService(agentConfig *config.Config) *FileWatcherService {
@@ -51,11 +49,10 @@ func NewFileWatcherService(agentConfig *config.Config) *FileWatcherService {
5149
filesChanged.Store(false)
5250

5351
return &FileWatcherService{
54-
agentConfig: agentConfig,
55-
directoriesBeingWatched: &sync.Map{},
56-
directoriesThatDontExistYet: &sync.Map{},
57-
enabled: enabled,
58-
filesChanged: filesChanged,
52+
agentConfig: agentConfig,
53+
directoriesToWatch: make(map[string]struct{}),
54+
enabled: enabled,
55+
filesChanged: filesChanged,
5956
}
6057
}
6158

@@ -114,25 +111,21 @@ func (fws *FileWatcherService) Update(ctx context.Context, nginxConfigContext *m
114111
directoriesToWatch[filepath.Dir(file)] = struct{}{}
115112
}
116113

117-
// If watcher does not exist yet add directories to directoriesThatDontExistYet so that watchers can be created
118-
// at the next file watcher monitoring period
119-
if fws.watcher == nil {
120-
for dir := range directoriesToWatch {
121-
fws.directoriesThatDontExistYet.Store(dir, struct{}{})
122-
}
123-
} else {
114+
fws.directoriesToWatch = directoriesToWatch
115+
116+
if fws.watcher != nil {
124117
slog.InfoContext(ctx, "Updating file watcher", "allowed", fws.agentConfig.AllowedDirectories)
125118

126119
// Start watching new directories
127-
fws.addWatchers(ctx, directoriesToWatch)
120+
fws.addWatchers(ctx)
128121

129122
// Check if directories no longer need to be watched
130-
fws.removeWatchers(ctx, directoriesToWatch)
123+
fws.removeWatchers(ctx)
131124
}
132125
}
133126

134-
func (fws *FileWatcherService) addWatchers(ctx context.Context, directoriesToWatch map[string]struct{}) {
135-
for directory := range directoriesToWatch {
127+
func (fws *FileWatcherService) addWatchers(ctx context.Context) {
128+
for directory := range fws.directoriesToWatch {
136129
if !fws.agentConfig.IsDirectoryAllowed(directory) {
137130
slog.WarnContext(
138131
ctx,
@@ -142,45 +135,26 @@ func (fws *FileWatcherService) addWatchers(ctx context.Context, directoriesToWat
142135

143136
continue
144137
}
145-
if fws.addWatcher(ctx, directory) {
146-
fws.directoriesThatDontExistYet.Delete(directory)
147-
} else {
148-
fws.directoriesThatDontExistYet.Store(directory, struct{}{})
149-
}
138+
139+
fws.addWatcher(ctx, directory)
150140
}
151141
}
152142

153-
func (fws *FileWatcherService) removeWatchers(ctx context.Context, directoriesToWatch map[string]struct{}) {
154-
fws.directoriesBeingWatched.Range(func(key, value interface{}) bool {
155-
directory, ok := key.(string)
156-
157-
if _, exists := directoriesToWatch[directory]; !exists && ok {
158-
fws.removeWatcher(ctx, directory)
159-
fws.directoriesBeingWatched.Delete(directory)
143+
func (fws *FileWatcherService) removeWatchers(ctx context.Context) {
144+
for _, directoryBeingWatched := range fws.watcher.WatchList() {
145+
if _, ok := fws.directoriesToWatch[directoryBeingWatched]; !ok {
146+
fws.removeWatcher(ctx, directoryBeingWatched)
160147
}
161-
162-
return true
163-
})
148+
}
164149
}
165150

166151
func (fws *FileWatcherService) handleEvent(ctx context.Context, event fsnotify.Event) {
167-
fmt.Printf("Processing FSNotify event %v \n", event)
168-
169152
if fws.enabled.Load() {
170153
if fws.isEventSkippable(event) {
171154
return
172155
}
173156

174157
slog.DebugContext(ctx, "Processing FSNotify event", "event", event)
175-
176-
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
177-
if _, ok := fws.directoriesBeingWatched.Load(event.Name); ok {
178-
fws.directoriesBeingWatched.Delete(event.Name)
179-
}
180-
181-
fws.directoriesThatDontExistYet.Store(event.Name, struct{}{})
182-
}
183-
184158
fws.filesChanged.Store(true)
185159
}
186160
}
@@ -201,18 +175,11 @@ func (fws *FileWatcherService) checkForUpdates(ctx context.Context, ch chan<- Fi
201175
fws.watcher = watcher
202176
}
203177

204-
fws.directoriesThatDontExistYet.Range(func(key, value interface{}) bool {
205-
directory, ok := key.(string)
206-
if !ok {
207-
return true
208-
}
209-
210-
if fws.addWatcher(ctx, directory) {
211-
fws.directoriesThatDontExistYet.Delete(directory)
212-
}
178+
// Start watching new directories
179+
fws.addWatchers(ctx)
213180

214-
return true
215-
})
181+
// Check if directories no longer need to be watched
182+
fws.removeWatchers(ctx)
216183

217184
if fws.filesChanged.Load() {
218185
newCtx := context.WithValue(
@@ -227,39 +194,21 @@ func (fws *FileWatcherService) checkForUpdates(ctx context.Context, ch chan<- Fi
227194
}
228195
}
229196

230-
func (fws *FileWatcherService) addWatcher(ctx context.Context, directory string) bool {
197+
func (fws *FileWatcherService) addWatcher(ctx context.Context, directory string) {
231198
slog.DebugContext(ctx, "Checking if file watcher needs to be added", "directory", directory)
232199

233-
if _, ok := fws.directoriesBeingWatched.Load(directory); !ok {
234-
if _, err := os.Stat(directory); errors.Is(err, os.ErrNotExist) {
235-
slog.DebugContext(
236-
ctx, "Unable to watch directory that does not exist",
237-
"directory", directory, "error", err,
238-
)
239-
240-
return false
241-
}
242-
243-
slog.DebugContext(ctx, "Adding watcher", "directory", directory)
244-
245-
if err := fws.watcher.Add(directory); err != nil {
246-
slog.ErrorContext(ctx, "Failed to add file watcher", "directory", directory, "error", err)
247-
removeError := fws.watcher.Remove(directory)
248-
if removeError != nil {
249-
slog.ErrorContext(
250-
ctx,
251-
"Failed to remove file watcher",
252-
"directory", directory, "error", removeError,
253-
)
254-
}
255-
256-
return false
257-
}
200+
if _, err := os.Stat(directory); errors.Is(err, os.ErrNotExist) {
201+
slog.DebugContext(
202+
ctx, "Unable to watch directory that does not exist",
203+
"directory", directory, "error", err,
204+
)
258205
}
259206

260-
fws.directoriesBeingWatched.Store(directory, struct{}{})
207+
slog.DebugContext(ctx, "Adding watcher", "directory", directory)
261208

262-
return true
209+
if err := fws.watcher.Add(directory); err != nil {
210+
slog.ErrorContext(ctx, "Failed to add file watcher", "directory", directory, "error", err)
211+
}
263212
}
264213

265214
func (fws *FileWatcherService) removeWatcher(ctx context.Context, path string) {

internal/watcher/file/file_watcher_service_test.go

Lines changed: 27 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package file
88
import (
99
"bytes"
1010
"context"
11-
"log/slog"
1211
"os"
1312
"path"
1413
"path/filepath"
@@ -34,8 +33,7 @@ const (
3433
func TestFileWatcherService_NewFileWatcherService(t *testing.T) {
3534
fileWatcherService := NewFileWatcherService(types.AgentConfig())
3635

37-
assert.Empty(t, fileWatcherService.directoriesBeingWatched)
38-
assert.Empty(t, fileWatcherService.directoriesThatDontExistYet)
36+
assert.Empty(t, fileWatcherService.directoriesToWatch)
3937
assert.True(t, fileWatcherService.enabled.Load())
4038
assert.False(t, fileWatcherService.filesChanged.Load())
4139
}
@@ -66,8 +64,9 @@ func TestFileWatcherService_addWatcher(t *testing.T) {
6664

6765
fileWatcherService.addWatcher(ctx, testDirectory)
6866

69-
_, ok := fileWatcherService.directoriesBeingWatched.Load(testDirectory)
70-
assert.True(t, ok)
67+
directoriesBeingWatched := fileWatcherService.watcher.WatchList()
68+
assert.Len(t, directoriesBeingWatched, 1)
69+
assert.Equal(t, testDirectory, directoriesBeingWatched[0])
7170
}
7271

7372
func TestFileWatcherService_addWatcher_Error(t *testing.T) {
@@ -80,11 +79,10 @@ func TestFileWatcherService_addWatcher_Error(t *testing.T) {
8079
tempDir := t.TempDir()
8180
testDirectory := path.Join(tempDir, "test_dir")
8281

83-
success := fileWatcherService.addWatcher(ctx, testDirectory)
84-
assert.False(t, success)
82+
fileWatcherService.addWatcher(ctx, testDirectory)
8583

86-
_, ok := fileWatcherService.directoriesBeingWatched.Load(testDirectory)
87-
assert.False(t, ok)
84+
directoriesBeingWatched := fileWatcherService.watcher.WatchList()
85+
assert.Empty(t, directoriesBeingWatched)
8886
}
8987

9088
func TestFileWatcherService_removeWatcher(t *testing.T) {
@@ -158,11 +156,10 @@ func TestFileWatcherService_Update(t *testing.T) {
158156
Includes: []string{filepath.Join(testDirectory, "*.conf")},
159157
})
160158

161-
_, ok := fileWatcherService.directoriesThatDontExistYet.Load(testDirectory)
159+
_, ok := fileWatcherService.directoriesToWatch[testDirectory]
162160
assert.True(t, ok)
163161

164-
_, ok = fileWatcherService.directoriesBeingWatched.Load(testDirectory)
165-
assert.False(t, ok)
162+
assert.Nil(t, fileWatcherService.watcher)
166163
})
167164

168165
t.Run("Test 2: watcher initialized", func(t *testing.T) {
@@ -175,23 +172,23 @@ func TestFileWatcherService_Update(t *testing.T) {
175172
Includes: []string{filepath.Join(testDirectory, "*.conf")},
176173
})
177174

178-
_, ok := fileWatcherService.directoriesThatDontExistYet.Load(testDirectory)
179-
assert.False(t, ok)
180-
181-
_, ok = fileWatcherService.directoriesBeingWatched.Load(testDirectory)
175+
_, ok := fileWatcherService.directoriesToWatch[testDirectory]
182176
assert.True(t, ok)
177+
178+
directoriesBeingWatched := fileWatcherService.watcher.WatchList()
179+
assert.Len(t, directoriesBeingWatched, 1)
180+
assert.Equal(t, testDirectory, directoriesBeingWatched[0])
183181
})
184182

185183
t.Run("Test 3: remove watchers", func(t *testing.T) {
186184
fileWatcherService.Update(ctx, &model.NginxConfigContext{
187185
Includes: []string{},
188186
})
189187

190-
_, ok := fileWatcherService.directoriesThatDontExistYet.Load(testDirectory)
191-
assert.False(t, ok)
188+
assert.Empty(t, fileWatcherService.directoriesToWatch)
192189

193-
_, ok = fileWatcherService.directoriesBeingWatched.Load(testDirectory)
194-
assert.False(t, ok)
190+
directoriesBeingWatched := fileWatcherService.watcher.WatchList()
191+
assert.Empty(t, directoriesBeingWatched)
195192
})
196193

197194
t.Run("Test 4: not allowed directory", func(t *testing.T) {
@@ -205,21 +202,18 @@ func TestFileWatcherService_Update(t *testing.T) {
205202
},
206203
})
207204

208-
_, ok := fileWatcherService.directoriesThatDontExistYet.Load("/unknown/location/test.conf")
205+
_, ok := fileWatcherService.directoriesToWatch["/unknown/location/test.conf"]
209206
assert.False(t, ok)
210207

211-
_, ok = fileWatcherService.directoriesBeingWatched.Load("/unknown/location/test.conf")
212-
assert.False(t, ok)
208+
directoriesBeingWatched := fileWatcherService.watcher.WatchList()
209+
assert.Empty(t, directoriesBeingWatched)
213210
})
214211
}
215212

216213
func TestFileWatcherService_Watch(t *testing.T) {
217214
ctx, cancel := context.WithCancel(context.Background())
218215
defer cancel()
219216

220-
slog.SetLogLoggerLevel(slog.LevelDebug)
221-
defer slog.SetLogLoggerLevel(slog.LevelInfo)
222-
223217
tempDir := t.TempDir()
224218
testDirectory := path.Join(tempDir, "test_dir")
225219
err := os.Mkdir(testDirectory, directoryPermissions)
@@ -248,13 +242,13 @@ func TestFileWatcherService_Watch(t *testing.T) {
248242
t.Run("Test 1: File updated", func(t *testing.T) {
249243
// Check that directory is being watched
250244
assert.Eventually(t, func() bool {
251-
_, ok := fileWatcherService.directoriesThatDontExistYet.Load(testDirectory)
252-
return !ok
245+
_, ok := fileWatcherService.directoriesToWatch[testDirectory]
246+
return ok
253247
}, 1*time.Second, 100*time.Millisecond)
254248

255249
assert.Eventually(t, func() bool {
256-
_, ok := fileWatcherService.directoriesBeingWatched.Load(testDirectory)
257-
return ok
250+
directoriesBeingWatched := fileWatcherService.watcher.WatchList()
251+
return len(directoriesBeingWatched) == 1
258252
}, 1*time.Second, 100*time.Millisecond)
259253

260254
select {
@@ -286,13 +280,13 @@ func TestFileWatcherService_Watch(t *testing.T) {
286280

287281
// Check that directory is no longer being watched
288282
assert.Eventually(t, func() bool {
289-
_, ok := fileWatcherService.directoriesThatDontExistYet.Load(testDirectory)
283+
_, ok := fileWatcherService.directoriesToWatch[testDirectory]
290284
return ok
291285
}, 1*time.Second, 100*time.Millisecond)
292286

293287
assert.Eventually(t, func() bool {
294-
_, ok := fileWatcherService.directoriesBeingWatched.Load(testDirectory)
295-
return !ok
288+
directoriesBeingWatched := fileWatcherService.watcher.WatchList()
289+
return len(directoriesBeingWatched) == 0
296290
}, 1*time.Second, 100*time.Millisecond)
297291
})
298292
}

0 commit comments

Comments
 (0)