-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathpoll_select.go
More file actions
156 lines (127 loc) Β· 3.25 KB
/
poll_select.go
File metadata and controls
156 lines (127 loc) Β· 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
//go:build solaris || darwin || freebsd || netbsd || openbsd || dragonfly
// +build solaris darwin freebsd netbsd openbsd dragonfly
package uv
import (
"errors"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
"golang.org/x/sys/unix"
)
// newSelectPollReader creates a new SelectReader for the given io.Reader.
func newSelectPollReader(reader io.Reader) (pollReader, error) {
file, ok := reader.(File)
if !ok || file.Fd() >= unix.FD_SETSIZE {
return newFallbackReader(reader)
}
r := &selectReader{
reader: reader,
file: file,
}
var err error
r.cancelSignalReader, r.cancelSignalWriter, err = os.Pipe()
if err != nil {
return nil, err
}
return r, nil
}
// selectReader implements pollReader using the POSIX select API.
type selectReader struct {
reader io.Reader
file File
cancelSignalReader *os.File
cancelSignalWriter *os.File
mu sync.Mutex
canceled bool
}
// Read reads data from the underlying reader.
func (r *selectReader) Read(p []byte) (int, error) {
r.mu.Lock()
if r.canceled {
r.mu.Unlock()
return 0, ErrCanceled
}
r.mu.Unlock()
return r.reader.Read(p)
}
// Poll waits for data to be available to read with the given timeout.
func (r *selectReader) Poll(timeout time.Duration) (bool, error) {
r.mu.Lock()
if r.canceled {
r.mu.Unlock()
return false, ErrCanceled
}
r.mu.Unlock()
for {
readerFd := int(r.file.Fd())
abortFd := int(r.cancelSignalReader.Fd())
maxFd := readerFd
if abortFd > maxFd {
maxFd = abortFd
}
// this is a limitation of the select syscall
if maxFd >= unix.FD_SETSIZE {
return false, fmt.Errorf("cannot select on file descriptor %d which is larger than %d", maxFd, unix.FD_SETSIZE)
}
fdSet := &unix.FdSet{}
fdSet.Set(readerFd)
fdSet.Set(abortFd)
var tv *unix.Timeval
if timeout >= 0 {
t := unix.NsecToTimeval(timeout.Nanoseconds())
tv = &t
}
n, err := unix.Select(maxFd+1, fdSet, nil, nil, tv)
if errors.Is(err, unix.EINTR) {
continue // try again if the syscall was interrupted
}
if err != nil {
return false, fmt.Errorf("select: %w", err)
}
if n == 0 {
return false, nil // timeout
}
if fdSet.IsSet(abortFd) {
// remove signal from pipe
var b [1]byte
_, readErr := r.cancelSignalReader.Read(b[:])
if readErr != nil {
return false, fmt.Errorf("reading cancel signal: %w", readErr)
}
return false, ErrCanceled
}
if fdSet.IsSet(readerFd) {
return true, nil
}
return false, fmt.Errorf("select returned without setting a file descriptor")
}
}
// Cancel cancels any ongoing poll or read operations.
func (r *selectReader) Cancel() bool {
r.mu.Lock()
r.canceled = true
r.mu.Unlock()
// send cancel signal
_, err := r.cancelSignalWriter.Write([]byte{'c'})
return err == nil
}
// Close closes the reader and releases any resources.
func (r *selectReader) Close() error {
var errMsgs []string
// close pipe
err := r.cancelSignalWriter.Close()
if err != nil {
errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal writer: %v", err))
}
err = r.cancelSignalReader.Close()
if err != nil {
errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal reader: %v", err))
}
if len(errMsgs) > 0 {
return fmt.Errorf("%s", strings.Join(errMsgs, ", "))
}
return nil
}