Skip to content

Commit 579ce11

Browse files
[chore][stanza/namedpipe] Refactor the namedpipe input to remove the fsnotify (#39529)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The current logic is that the input starts 2 goroutines, one for watching for write events, and another one listens from `watcher.C` and kick off the process. In other words, it relies on a write event to start to read from the pipe. When an active writer is writing to the pipe, and the collector exits for whatever reason (or simply can't consume fast enough), the pipe buffer can be filled and no write can come in. When no write happens, the watcher doesn't receive any write event, and won't kick off the process to read the pipe. This is effectively a deadlock, the writer can't write to the pipe because the pipe buffer is full, and the collector won't read from the pipe because no write event triggered. Refactor the namedpipe input to get rid of the watcher. The input will now keep reading from pipe. If no data or no external writer, it blocks at the `scan.Scan()` wait for new data or an error when pipe is closed. #### How to reproduce the bug A simple python script to write to a named pipe periodically: ```python import os import time PIPE_PATH = "/tmp/my_named_pipe" try: fd = os.open(PIPE_PATH, os.O_RDWR | os.O_APPEND | os.O_NONBLOCK) except OSError as e: print(f"Failed to open pipe: {e}") exit(1) msg = b"x" * 1024 * 10 + b"\n" # 10 KB message with a newline at the end while True: try: os.write(fd, msg) print("write ok") except BlockingIOError: print("PIPE FULL - write blocked") time.sleep(1) ``` The minimal Otel collector config: ```yaml receivers: namedpipe: path: /tmp/my_named_pipe mode: 0600 exporters: debug: verbosity: basic service: pipelines: logs: receivers: [namedpipe] processors: [] exporters: [debug] ``` 1. Start the python script to write the pipe to full (when seeing "PIPE FULL - write blocked"). It's usually the 7th write if the pipe buffer is 64K (default value). 2. Start the otel collector and see it doesn't process anything from the pipe. <!--Describe what testing was performed and which tests were added.--> #### Testing Passed all current unit tests. Also tested locally with the steps above to confirm it reads from the pipe from the start. --------- Signed-off-by: Mengnan Gong <namco1992@gmail.com> Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
1 parent edc64d2 commit 579ce11

File tree

6 files changed

+94
-105
lines changed

6 files changed

+94
-105
lines changed

pkg/stanza/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ require (
66
github.com/bmatcuk/doublestar/v4 v4.8.1
77
github.com/cespare/xxhash/v2 v2.3.0
88
github.com/expr-lang/expr v1.17.2
9-
github.com/fsnotify/fsnotify v1.9.0
109
github.com/goccy/go-json v0.10.5
1110
github.com/jonboulle/clockwork v0.5.0
1211
github.com/jpillora/backoff v1.0.0
@@ -45,6 +44,7 @@ require (
4544
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
4645
github.com/davecgh/go-spew v1.1.1 // indirect
4746
github.com/elastic/lunes v0.1.0 // indirect
47+
github.com/fsnotify/fsnotify v1.9.0 // indirect
4848
github.com/go-logr/logr v1.4.2 // indirect
4949
github.com/go-logr/stdr v1.2.2 // indirect
5050
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect

pkg/stanza/operator/input/namedpipe/input.go

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"os"
1313
"sync"
14+
"time"
1415

1516
"go.uber.org/zap"
1617
"golang.org/x/sys/unix"
@@ -20,6 +21,8 @@ import (
2021
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
2122
)
2223

24+
const ReadTimeout = 2 * time.Second
25+
2326
type Input struct {
2427
helper.InputOperator
2528

@@ -55,59 +58,61 @@ func (i *Input) Start(_ operator.Persister) error {
5558
return fmt.Errorf("failed to chmod named pipe: %w", chmodErr)
5659
}
5760

58-
watcher, err := NewWatcher(i.path)
59-
if err != nil {
60-
return fmt.Errorf("failed to create watcher: %w", err)
61-
}
62-
61+
// Open the pipe with O_RDWR so it won't block on opening the pipe when there is no writer.
62+
// The current process is both a writer and reader, which prevents the read from receiving
63+
// EOF because there is always a writer (the process itself) connects to the pipe.
6364
pipe, err := os.OpenFile(i.path, os.O_RDWR, os.ModeNamedPipe)
6465
if err != nil {
6566
return fmt.Errorf("failed to open named pipe: %w", err)
6667
}
67-
6868
i.pipe = pipe
6969

7070
ctx, cancel := context.WithCancel(context.Background())
7171
i.cancel = cancel
7272

73-
i.wg.Add(2)
74-
go func() {
75-
defer i.wg.Done()
76-
if err := watcher.Watch(ctx); err != nil {
77-
i.Logger().Error("failed to watch named pipe", zap.Error(err))
78-
}
79-
}()
80-
81-
go func() {
82-
defer i.wg.Done()
83-
for {
84-
select {
85-
case <-watcher.C:
86-
if err := i.process(ctx, pipe); err != nil {
87-
i.Logger().Error("failed to process named pipe", zap.Error(err))
88-
}
89-
case <-ctx.Done():
90-
return
91-
}
92-
}
93-
}()
73+
i.wg.Add(1)
74+
go i.readLoop(ctx)
9475

9576
return nil
9677
}
9778

9879
func (i *Input) Stop() error {
99-
if i.pipe != nil {
100-
i.pipe.Close()
101-
}
102-
10380
if i.cancel != nil {
10481
i.cancel()
10582
}
10683

84+
if i.pipe != nil {
85+
i.pipe.Close()
86+
}
87+
10788
i.wg.Wait()
10889
return nil
10990
}
11091

92+
func (i *Input) readLoop(ctx context.Context) {
93+
defer i.wg.Done()
94+
pipe := i.pipe
95+
96+
for {
97+
select {
98+
case <-ctx.Done():
99+
return
100+
default:
101+
if err := i.process(ctx, pipe); err != nil {
102+
i.Logger().Error("error processing named pipe", zap.Error(err))
103+
}
104+
105+
// The process exits due to whatever reason, wait for ReadTimeout and try again.
106+
select {
107+
case <-ctx.Done():
108+
return
109+
case <-time.After(ReadTimeout):
110+
i.Logger().Warn("processing named pipe is interrupted, retrying the process now", zap.String("path", i.path))
111+
}
112+
}
113+
}
114+
}
115+
111116
func (i *Input) process(ctx context.Context, pipe *os.File) error {
112117
scan := bufio.NewScanner(pipe)
113118
scan.Split(i.splitFunc)

pkg/stanza/operator/input/namedpipe/input_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,60 @@ func TestPipeWrites(t *testing.T) {
150150
}
151151
}
152152
}
153+
154+
// TestPipeHasDataAtStartup will test if the receiver can consume from a named
155+
// pipe that has buffered data before startup.
156+
func TestPipeHasDataAtStartup(t *testing.T) {
157+
fake := testutil.NewFakeOutput(t)
158+
159+
conf := NewConfig()
160+
conf.Path = filename(t)
161+
conf.Permissions = 0o666
162+
conf.OutputIDs = []string{fake.ID()}
163+
164+
set := componenttest.NewNopTelemetrySettings()
165+
op, err := conf.Build(set)
166+
require.NoError(t, err)
167+
ops := []operator.Operator{op, fake}
168+
169+
// create pipe
170+
require.NoError(t, unix.Mkfifo(conf.Path, conf.Permissions))
171+
172+
pipe, err := os.OpenFile(conf.Path, os.O_RDWR|os.O_APPEND, os.ModeNamedPipe)
173+
require.NoError(t, err)
174+
defer func() {
175+
require.NoError(t, pipe.Close())
176+
}()
177+
178+
logs := []string{"log1\n", "log2\n"}
179+
180+
for _, log := range logs {
181+
_, err = pipe.WriteString(log)
182+
require.NoError(t, err)
183+
}
184+
185+
p, err := pipeline.NewDirectedPipeline(ops)
186+
require.NoError(t, err)
187+
188+
// start receiver
189+
require.NoError(t, p.Start(testutil.NewUnscopedMockPersister()))
190+
defer func() {
191+
require.NoError(t, p.Stop())
192+
}()
193+
194+
for _, log := range logs {
195+
expect := &entry.Entry{
196+
Body: strings.TrimSpace(log),
197+
}
198+
199+
select {
200+
case e := <-fake.Received:
201+
obs := time.Now()
202+
expect.ObservedTimestamp = obs
203+
e.ObservedTimestamp = obs
204+
require.Equal(t, expect, e)
205+
case <-time.After(time.Second):
206+
t.Fatal("timed-out waiting for log entry")
207+
}
208+
}
209+
}

pkg/stanza/operator/input/namedpipe/watcher.go

Lines changed: 0 additions & 70 deletions
This file was deleted.

receiver/namedpipereceiver/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ require (
3131
github.com/davecgh/go-spew v1.1.1 // indirect
3232
github.com/elastic/lunes v0.1.0 // indirect
3333
github.com/expr-lang/expr v1.17.2 // indirect
34-
github.com/fsnotify/fsnotify v1.9.0 // indirect
3534
github.com/go-logr/logr v1.4.2 // indirect
3635
github.com/go-logr/stdr v1.2.2 // indirect
3736
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect

receiver/namedpipereceiver/go.sum

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)