Skip to content

Commit 0e40b03

Browse files
committed
Add file watching functionality to FileInput
- Introduced FileInputConfig to manage new configuration options for file watching. - Implemented watchForNewFiles and checkForNewFiles methods to monitor and process newly created files. - Added a new test case to validate the functionality of watching for new files. - Updated settings to include options for enabling file watching and setting the watch interval.
1 parent 251e45a commit 0e40b03

File tree

3 files changed

+205
-23
lines changed

3 files changed

+205
-23
lines changed

input_file.go

Lines changed: 145 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -202,42 +202,156 @@ func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReade
202202

203203
// FileInput can read requests generated by FileOutput
204204
type FileInput struct {
205-
mu sync.Mutex
206-
data chan []byte
207-
exit chan bool
208-
path string
209-
readers []*fileInputReader
210-
speedFactor float64
211-
loop bool
212-
readDepth int
213-
dryRun bool
214-
maxWait time.Duration
205+
mu sync.Mutex
206+
data chan []byte
207+
exit chan bool
208+
path string
209+
readers []*fileInputReader
210+
processedFiles map[string]bool
211+
speedFactor float64
212+
loop bool
213+
readDepth int
214+
dryRun bool
215+
maxWait time.Duration
216+
watchInterval time.Duration
217+
watching bool
215218

216219
stats *expvar.Map
217220
}
218221

222+
// FileInputConfig configuration for the FileInput
223+
type FileInputConfig struct {
224+
Loop bool
225+
ReadDepth int
226+
MaxWait time.Duration
227+
DryRun bool
228+
WatchNewFiles bool // Whether to watch for new files matching the pattern
229+
WatchInterval time.Duration // Interval to check for new files
230+
}
231+
219232
// NewFileInput constructor for FileInput. Accepts file path as argument.
220233
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
234+
config := &FileInputConfig{
235+
Loop: loop,
236+
ReadDepth: readDepth,
237+
MaxWait: maxWait,
238+
DryRun: dryRun,
239+
WatchNewFiles: Settings.InputFileWatch,
240+
WatchInterval: Settings.InputFileWatchInterval,
241+
}
242+
return NewFileInputWithConfig(path, config)
243+
}
244+
245+
// NewFileInputWithConfig constructor for FileInput with detailed configuration.
246+
func NewFileInputWithConfig(path string, config *FileInputConfig) (i *FileInput) {
221247
i = new(FileInput)
222248
i.data = make(chan []byte, 1000)
223249
i.exit = make(chan bool)
224250
i.path = path
225251
i.speedFactor = 1
226-
i.loop = loop
227-
i.readDepth = readDepth
252+
i.loop = config.Loop
253+
i.readDepth = config.ReadDepth
228254
i.stats = expvar.NewMap("file-" + path)
229-
i.dryRun = dryRun
230-
i.maxWait = maxWait
255+
i.dryRun = config.DryRun
256+
i.maxWait = config.MaxWait
257+
i.processedFiles = make(map[string]bool)
258+
i.watching = config.WatchNewFiles
259+
260+
if config.WatchInterval > 0 {
261+
i.watchInterval = config.WatchInterval
262+
} else {
263+
i.watchInterval = 5 * time.Second
264+
}
231265

232266
if err := i.init(); err != nil {
233267
return
234268
}
235269

236270
go i.emit()
271+
272+
if i.watching {
273+
go i.watchForNewFiles()
274+
}
237275

238276
return
239277
}
240278

279+
// watchForNewFiles periodically checks for new files matching the path pattern
280+
func (i *FileInput) watchForNewFiles() {
281+
ticker := time.NewTicker(i.watchInterval)
282+
defer ticker.Stop()
283+
284+
for {
285+
select {
286+
case <-i.exit:
287+
return
288+
case <-ticker.C:
289+
i.checkForNewFiles()
290+
}
291+
}
292+
}
293+
294+
// checkForNewFiles looks for new files that match the pattern and adds them to readers
295+
func (i *FileInput) checkForNewFiles() {
296+
defer i.mu.Unlock()
297+
i.mu.Lock()
298+
299+
var matches []string
300+
var err error
301+
302+
if strings.HasPrefix(i.path, "s3://") {
303+
sess := session.Must(session.NewSession(awsConfig()))
304+
svc := s3.New(sess)
305+
306+
bucket, key := parseS3Url(i.path)
307+
308+
params := &s3.ListObjectsInput{
309+
Bucket: aws.String(bucket),
310+
Prefix: aws.String(key),
311+
}
312+
313+
resp, err := svc.ListObjects(params)
314+
if err != nil {
315+
Debug(2, "[INPUT-FILE] Error while retrieving list of files from S3", i.path, err)
316+
return
317+
}
318+
319+
for _, c := range resp.Contents {
320+
path := "s3://" + bucket + "/" + (*c.Key)
321+
matches = append(matches, path)
322+
}
323+
} else if matches, err = filepath.Glob(i.path); err != nil {
324+
Debug(2, "[INPUT-FILE] Wrong file pattern", i.path, err)
325+
return
326+
}
327+
328+
if len(matches) == 0 {
329+
return
330+
}
331+
332+
newFilesFound := false
333+
334+
// Check for new files that haven't been processed yet
335+
for _, path := range matches {
336+
if i.processedFiles[path] {
337+
continue
338+
}
339+
340+
Debug(2, fmt.Sprintf("[INPUT-FILE] Found new file: %s", path))
341+
reader := newFileInputReader(path, i.readDepth, i.dryRun)
342+
if reader != nil {
343+
i.readers = append(i.readers, reader)
344+
i.processedFiles[path] = true
345+
newFilesFound = true
346+
}
347+
}
348+
349+
if newFilesFound {
350+
i.stats.Add("reader_count", int64(len(i.readers)))
351+
i.stats.Add("new_files_found", 1)
352+
}
353+
}
354+
241355
func parseS3Url(path string) (bucket, key string) {
242356
path = path[5:] // stripping `s3://`
243357
sep := strings.IndexByte(path, '/')
@@ -272,7 +386,9 @@ func (i *FileInput) init() (err error) {
272386
}
273387

274388
for _, c := range resp.Contents {
275-
matches = append(matches, "s3://"+bucket+"/"+(*c.Key))
389+
path := "s3://" + bucket + "/" + (*c.Key)
390+
matches = append(matches, path)
391+
i.processedFiles[path] = true
276392
}
277393
} else if matches, err = filepath.Glob(i.path); err != nil {
278394
Debug(2, "[INPUT-FILE] Wrong file pattern", i.path, err)
@@ -288,6 +404,7 @@ func (i *FileInput) init() (err error) {
288404

289405
for idx, p := range matches {
290406
i.readers[idx] = newFileInputReader(p, i.readDepth, i.dryRun)
407+
i.processedFiles[p] = true
291408
}
292409

293410
i.stats.Add("reader_count", int64(len(matches)))
@@ -341,6 +458,7 @@ func (i *FileInput) emit() {
341458
minWait = math.MaxInt64
342459

343460
i.stats.Add("negative_wait", 0)
461+
i.stats.Add("watch_pauses", 0)
344462

345463
for {
346464
select {
@@ -356,7 +474,14 @@ func (i *FileInput) emit() {
356474
i.init()
357475
lastTime = -1
358476
continue
477+
} else if i.watching {
478+
// When watching for new files, we just wait and continue
479+
i.stats.Add("watch_pauses", 1)
480+
Debug(2, fmt.Sprintf("[INPUT-FILE] No active readers, waiting for new files matching pattern '%s'", i.path))
481+
time.Sleep(i.watchInterval)
482+
continue
359483
} else {
484+
// If not watching, we break out and exit
360485
break
361486
}
362487
}
@@ -420,7 +545,11 @@ func (i *FileInput) emit() {
420545
i.stats.Set("max_wait", time.Duration(maxWait))
421546
i.stats.Set("min_wait", time.Duration(minWait))
422547

423-
Debug(2, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))
548+
if i.watching {
549+
Debug(2, fmt.Sprintf("[INPUT-FILE] No more active readers. Will continue watching for new files matching '%s'\n", i.path))
550+
} else {
551+
Debug(2, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))
552+
}
424553

425554
if i.dryRun {
426555
fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n",

input_file_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,55 @@ func TestInputFileCompressed(t *testing.T) {
235235
os.Remove(name2)
236236
}
237237

238+
func TestInputFileWatchForNewFiles(t *testing.T) {
239+
rnd := rand.Int63()
240+
basePath := fmt.Sprintf("/tmp/%d", rnd)
241+
242+
// Create first file
243+
file1, _ := os.OpenFile(fmt.Sprintf("%s_1", basePath), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
244+
file1.Write([]byte("1 1 1\ntest1"))
245+
file1.Write([]byte(payloadSeparator))
246+
file1.Close()
247+
248+
// Initialize input with watching enabled and short watch interval
249+
config := &FileInputConfig{
250+
Loop: false,
251+
ReadDepth: 100,
252+
MaxWait: 0,
253+
DryRun: false,
254+
WatchNewFiles: true,
255+
WatchInterval: 300 * time.Millisecond, // Faster interval for testing
256+
}
257+
258+
input := NewFileInputWithConfig(fmt.Sprintf("%s_*", basePath), config)
259+
260+
// Read the first message
261+
msg1, err := input.PluginRead()
262+
if err != nil || string(msg1.Data) != "test1" {
263+
t.Error("Should read first file correctly:", err)
264+
}
265+
266+
// Add a second file while input is running
267+
file2, _ := os.OpenFile(fmt.Sprintf("%s_2", basePath), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
268+
file2.Write([]byte("1 1 2\ntest2"))
269+
file2.Write([]byte(payloadSeparator))
270+
file2.Close()
271+
272+
// Wait for file discovery and processing (at least 2 watch intervals)
273+
time.Sleep(700 * time.Millisecond)
274+
275+
// Should be able to read from the newly added file
276+
msg2, err := input.PluginRead()
277+
if err != nil || string(msg2.Data) != "test2" {
278+
t.Error("Should read newly added file correctly:", err)
279+
}
280+
281+
// Clean up
282+
input.Close()
283+
os.Remove(file1.Name())
284+
os.Remove(file2.Name())
285+
}
286+
238287
type CaptureFile struct {
239288
msgs []*Message
240289
file *os.File

settings.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,15 @@ type AppSettings struct {
8787
OutputWebSocketConfig WebSocketOutputConfig
8888
OutputWebSocketStats bool `json:"output-ws-stats"`
8989

90-
InputFile []string `json:"input-file"`
91-
InputFileLoop bool `json:"input-file-loop"`
92-
InputFileReadDepth int `json:"input-file-read-depth"`
93-
InputFileDryRun bool `json:"input-file-dry-run"`
94-
InputFileMaxWait time.Duration `json:"input-file-max-wait"`
95-
OutputFile []string `json:"output-file"`
96-
OutputFileConfig FileOutputConfig
90+
InputFile []string `json:"input-file"`
91+
InputFileLoop bool `json:"input-file-loop"`
92+
InputFileReadDepth int `json:"input-file-read-depth"`
93+
InputFileDryRun bool `json:"input-file-dry-run"`
94+
InputFileMaxWait time.Duration `json:"input-file-max-wait"`
95+
InputFileWatch bool `json:"input-file-watch"`
96+
InputFileWatchInterval time.Duration `json:"input-file-watch-interval"`
97+
OutputFile []string `json:"output-file"`
98+
OutputFileConfig FileOutputConfig
9799

98100
InputRAW []string `json:"input_raw"`
99101
InputRAWConfig RAWInputConfig
@@ -167,6 +169,8 @@ func init() {
167169
flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance")
168170
flag.BoolVar(&Settings.InputFileDryRun, "input-file-dry-run", false, "Simulate reading from the data source without replaying it. You will get information about expected replay time, number of found records etc.")
169171
flag.DurationVar(&Settings.InputFileMaxWait, "input-file-max-wait", 0, "Set the maximum time between requests. Can help in situations when you have too long periods between request, and you want to skip them. Example: --input-raw-max-wait 1s")
172+
flag.BoolVar(&Settings.InputFileWatch, "input-file-watch", true, "Watch for new files matching pattern. When turned on, Gor will continue running after processing all existing files, watching for new ones.")
173+
flag.DurationVar(&Settings.InputFileWatchInterval, "input-file-watch-interval", 5*time.Second, "Interval for checking for new files. Example: --input-file-watch-interval 10s")
170174

171175
flag.Var(&MultiOption{&Settings.OutputFile}, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor")
172176
flag.DurationVar(&Settings.OutputFileConfig.FlushInterval, "output-file-flush-interval", time.Second, "Interval for forcing buffer flush to the file, default: 1s.")

0 commit comments

Comments
 (0)