Skip to content

Commit bf557ee

Browse files
committed
Merge remote-tracking branch 'origin/master'
* origin/master: Restore CARP_MALLOC NULL checks in kqueue and epoll Poll_wait_ Add NULL check after CARP_MALLOC in Poll_copy Guard CARP_MALLOC returns in Poll_wait_ to prevent NULL dereference Add CARP_FORCE_POLL override, CARP_MALLOC NULL checks, and poll2 test wrapper Implement POSIX poll(2) fallback for non-epoll/kqueue platforms Make Poll.wait propagate errors from epoll_wait/kevent Fix send_nb_ negative offset UB and send_len_ OOB read Add non-blocking I/O support to UnixStream
2 parents 8f65643 + ee600a9 commit bf557ee

8 files changed

Lines changed: 432 additions & 6 deletions

File tree

src/poll.carp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ Check `readable`, `writable`, and `error?` to see what happened on `fd`.")
3737
(register-type Poll)
3838

3939
(doc Poll "is an I/O event multiplexer that monitors multiple file descriptors
40-
for readiness. Uses kqueue on macOS/BSD and epoll on Linux.
40+
for readiness. Uses kqueue on macOS/BSD, epoll on Linux, and POSIX poll(2)
41+
as a fallback on other platforms.
4142

4243
## Usage
4344
```
@@ -77,6 +78,9 @@ for readiness. Uses kqueue on macOS/BSD and epoll on Linux.
7778
(private wait-)
7879
(hidden wait-)
7980
(register wait- (Fn [&Poll Int] (Array PollEvent)) "Poll_wait_")
81+
(private wait-failed?)
82+
(hidden wait-failed?)
83+
(register wait-failed? (Fn [&(Array PollEvent)] Bool) "Poll_wait_failed_")
8084

8185
; -- public API --
8286

@@ -121,7 +125,11 @@ Use `poll-read`, `poll-write`, or `poll-read-write`.")
121125

122126
(doc wait "waits for events, blocking up to `timeout-ms` milliseconds.
123127
Pass -1 to block indefinitely. Returns `(Result (Array PollEvent) String)`.")
124-
(defn wait [poll timeout-ms] (Result.Success (wait- poll timeout-ms)))
128+
(defn wait [poll timeout-ms]
129+
(let [events (wait- poll timeout-ms)]
130+
(if (wait-failed? &events)
131+
(Result.Error (System.error-text))
132+
(Result.Success events))))
125133

126134
(register close (Fn [Poll] ()))
127135
(doc close "closes the poll instance.")

src/poll.h

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ PollEvent PollEvent_copy(PollEvent* e) {
3535
* Platform-specific Poll implementation
3636
* -------------------------------------------------------------------------- */
3737

38-
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
38+
#if defined(CARP_FORCE_POLL)
39+
#define CARP_USE_POLL 1
40+
#include <poll.h>
41+
#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__)
3942
#define CARP_USE_KQUEUE 1
4043
#include <sys/event.h>
4144
#elif defined(__linux__)
@@ -120,7 +123,16 @@ Array Poll_wait_(Poll* p, int timeout_ms) {
120123

121124
if (n > 0) {
122125
fds = (int*)CARP_MALLOC(n * sizeof(int));
126+
if (!fds) {
127+
result.len = 0; result.capacity = 0; result.data = NULL;
128+
return result;
129+
}
123130
flags = (int*)CARP_MALLOC(n * sizeof(int));
131+
if (!flags) {
132+
CARP_FREE(fds);
133+
result.len = 0; result.capacity = 0; result.data = NULL;
134+
return result;
135+
}
124136

125137
for (int i = 0; i < n; i++) {
126138
int fd = (int)events[i].ident;
@@ -147,6 +159,12 @@ Array Poll_wait_(Poll* p, int timeout_ms) {
147159
result.len = unique;
148160
result.capacity = unique > 0 ? unique : 1;
149161
result.data = CARP_MALLOC(result.capacity * sizeof(PollEvent));
162+
if (!result.data) {
163+
if (fds) CARP_FREE(fds);
164+
if (flags) CARP_FREE(flags);
165+
result.len = 0; result.capacity = 0;
166+
return result;
167+
}
150168

151169
for (int i = 0; i < unique; i++) {
152170
PollEvent* e = &((PollEvent*)result.data)[i];
@@ -216,6 +234,10 @@ Array Poll_wait_(Poll* p, int timeout_ms) {
216234
result.len = n;
217235
result.capacity = n > 0 ? n : 1;
218236
result.data = CARP_MALLOC(result.capacity * sizeof(PollEvent));
237+
if (!result.data) {
238+
result.len = 0; result.capacity = 0;
239+
return result;
240+
}
219241

220242
for (int i = 0; i < n; i++) {
221243
PollEvent* e = &((PollEvent*)result.data)[i];
@@ -233,12 +255,146 @@ void Poll_close(Poll p) {
233255

234256
#endif /* CARP_USE_EPOLL */
235257

258+
#ifdef CARP_USE_POLL
259+
260+
typedef struct {
261+
struct pollfd *fds;
262+
int count;
263+
int capacity;
264+
} Poll;
265+
266+
Poll Poll_create_() {
267+
Poll p;
268+
p.capacity = 16;
269+
p.count = 0;
270+
p.fds = (struct pollfd*)CARP_MALLOC(p.capacity * sizeof(struct pollfd));
271+
if (!p.fds) {
272+
p.capacity = 0;
273+
}
274+
return p;
275+
}
276+
277+
int Poll_fd_(Poll* p) {
278+
/* No kernel fd — return 0 if valid, -1 if not */
279+
return (p->fds != NULL) ? 0 : -1;
280+
}
281+
282+
int Poll_add_(Poll* p, int fd, int interest) {
283+
if (p->count >= p->capacity) {
284+
int new_cap = p->capacity * 2;
285+
struct pollfd *new_fds = (struct pollfd*)CARP_MALLOC(new_cap * sizeof(struct pollfd));
286+
if (!new_fds) return -1;
287+
memcpy(new_fds, p->fds, (size_t)p->count * sizeof(struct pollfd));
288+
CARP_FREE(p->fds);
289+
p->fds = new_fds;
290+
p->capacity = new_cap;
291+
}
292+
struct pollfd *pfd = &p->fds[p->count];
293+
pfd->fd = fd;
294+
pfd->events = 0;
295+
if (interest & POLL_READ) pfd->events |= POLLIN;
296+
if (interest & POLL_WRITE) pfd->events |= POLLOUT;
297+
pfd->revents = 0;
298+
p->count++;
299+
return 0;
300+
}
301+
302+
int Poll_modify_(Poll* p, int fd, int interest) {
303+
for (int i = 0; i < p->count; i++) {
304+
if (p->fds[i].fd == fd) {
305+
p->fds[i].events = 0;
306+
if (interest & POLL_READ) p->fds[i].events |= POLLIN;
307+
if (interest & POLL_WRITE) p->fds[i].events |= POLLOUT;
308+
return 0;
309+
}
310+
}
311+
errno = ENOENT;
312+
return -1;
313+
}
314+
315+
int Poll_remove_(Poll* p, int fd) {
316+
for (int i = 0; i < p->count; i++) {
317+
if (p->fds[i].fd == fd) {
318+
p->fds[i] = p->fds[p->count - 1];
319+
p->count--;
320+
return 0;
321+
}
322+
}
323+
errno = ENOENT;
324+
return -1;
325+
}
326+
327+
Array Poll_wait_(Poll* p, int timeout_ms) {
328+
Array result;
329+
if (!p->fds) {
330+
result.len = 0; result.capacity = 0; result.data = NULL;
331+
return result;
332+
}
333+
334+
int n = poll(p->fds, (nfds_t)p->count, timeout_ms);
335+
if (n < 0) {
336+
result.len = 0; result.capacity = 0; result.data = NULL;
337+
return result;
338+
}
339+
340+
int ready = 0;
341+
for (int i = 0; i < p->count; i++) {
342+
if (p->fds[i].revents != 0) ready++;
343+
}
344+
345+
result.len = ready;
346+
result.capacity = ready > 0 ? ready : 1;
347+
result.data = CARP_MALLOC(result.capacity * sizeof(PollEvent));
348+
if (!result.data) {
349+
result.len = 0; result.capacity = 0;
350+
return result;
351+
}
352+
353+
int j = 0;
354+
for (int i = 0; i < p->count && j < ready; i++) {
355+
if (p->fds[i].revents == 0) continue;
356+
PollEvent* e = &((PollEvent*)result.data)[j];
357+
e->fd = p->fds[i].fd;
358+
e->readable = (p->fds[i].revents & POLLIN) != 0;
359+
e->writable = (p->fds[i].revents & POLLOUT) != 0;
360+
e->error = (p->fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
361+
j++;
362+
}
363+
364+
result.len = j;
365+
return result;
366+
}
367+
368+
void Poll_close(Poll p) {
369+
if (p.fds) CARP_FREE(p.fds);
370+
}
371+
372+
#endif /* CARP_USE_POLL */
373+
374+
bool Poll_wait_failed_(Array* a) {
375+
return a->data == NULL;
376+
}
377+
236378
Poll Poll_copy(Poll* p) {
237379
Poll c;
238380
#ifdef CARP_USE_KQUEUE
239381
c.kq = p->kq;
240382
#elif defined(CARP_USE_EPOLL)
241383
c.epfd = p->epfd;
384+
#elif defined(CARP_USE_POLL)
385+
c.count = p->count;
386+
c.capacity = p->capacity;
387+
if (p->fds && p->capacity > 0) {
388+
c.fds = (struct pollfd*)CARP_MALLOC((size_t)p->capacity * sizeof(struct pollfd));
389+
if (!c.fds) {
390+
c.count = 0;
391+
c.capacity = 0;
392+
} else {
393+
memcpy(c.fds, p->fds, (size_t)p->count * sizeof(struct pollfd));
394+
}
395+
} else {
396+
c.fds = NULL;
397+
}
242398
#endif
243399
return c;
244400
}

src/unix_stream.carp

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,19 @@
6767
(let [n (send-bytes- stream data)]
6868
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
6969

70+
(private send-len-)
71+
(hidden send-len-)
72+
(register send-len-
73+
(Fn [&UnixStream &String Int] Int)
74+
"UnixStream_send_MINUS_len_")
75+
76+
(doc send-len "sends a string with known length (avoids strlen). `len` must be
77+
between 0 and the string’s length; returns an error otherwise.
78+
Returns bytes sent or an error.")
79+
(defn send-len [stream msg len]
80+
(let [n (send-len- stream msg len)]
81+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
82+
7083
(doc read
7184
"reads up to 4096 bytes from the stream. Returns the data as a string, or an error.
7285
Returns an empty string on connection close.")
@@ -83,7 +96,11 @@ Returns bytes read (0 = connection closed), or an error.")
8396
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
8497

8598
(register close (Fn [UnixStream] ()))
86-
(doc close "closes the stream, releasing the file descriptor.")
99+
(doc close "closes the stream, consuming it.")
100+
101+
(doc close! "closes the stream by reference, for use when the stream lives
102+
in a collection. Sets the fd to -1 to prevent double-close.")
103+
(register close! (Fn [&UnixStream] ()) "UnixStream_close_MINUS_ref")
87104

88105
(doc shutdown "shuts down the stream. 0 = reads, 1 = writes, 2 = both.")
89106
(register shutdown- (Fn [&UnixStream Int] ()) "UnixStream_shutdown_")
@@ -98,12 +115,59 @@ Returns bytes read (0 = connection closed), or an error.")
98115
(doc set-timeout "sets read and write timeouts in seconds.")
99116
(register set-timeout (Fn [&UnixStream Int] ()))
100117

118+
(doc set-nonblocking "puts the socket into non-blocking mode. After this
119+
call, `send-nb` and `read-append-nb` are the appropriate I/O entry points;
120+
the blocking variants will return `EAGAIN` instead of waiting.")
121+
(register set-nonblocking (Fn [&UnixStream] ()))
122+
123+
(private send-nb-)
124+
(hidden send-nb-)
125+
(register send-nb-
126+
(Fn [&UnixStream &(Array Byte) Int] Int)
127+
"UnixStream_send_MINUS_nb_")
128+
129+
(doc send-nb "non-blocking send. Sends as many bytes as the kernel will
130+
accept right now from `data`, starting at `offset`.
131+
132+
Returns `(Result Int String)`. The `Int` is the number of bytes actually
133+
written, which may be `0` if the socket is not currently writable. Re-arm
134+
write interest on the next event-loop iteration in that case.")
135+
(defn send-nb [stream data offset]
136+
(let [n (send-nb- stream data offset)]
137+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
138+
139+
(private read-append-nb-)
140+
(hidden read-append-nb-)
141+
(register read-append-nb-
142+
(Fn [&UnixStream &(Array Byte)] Int)
143+
"UnixStream_read_MINUS_append_MINUS_nb_")
144+
145+
(doc read-blocked "sentinel returned by `read-append-nb` when no data is
146+
currently available on a non-blocking socket.")
147+
(def read-blocked -2)
148+
149+
(doc read-append-nb "non-blocking append-read. Reads whatever the kernel
150+
has buffered into `buf`, growing it as needed.
151+
152+
Returns `(Result Int String)`. The `Int` is one of:
153+
154+
- `> 0` bytes appended to `buf`,
155+
- `0` peer closed cleanly (EOF),
156+
- `read-blocked` (-2) socket has no data right now, retry on the next
157+
readable event.")
158+
(defn read-append-nb [stream buf]
159+
(let [n (read-append-nb- stream buf)]
160+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
161+
101162
(doc peer-path "returns the path of the remote peer socket.")
102163
(register peer-path (Fn [&UnixStream] String))
103164

104165
(register copy (Fn [&UnixStream] UnixStream))
105166
(implements copy UnixStream.copy)
106167

168+
(register prn (Fn [UnixStream] String) "UnixStream_prn_")
169+
(implements prn UnixStream.prn)
170+
107171
(defn poll-fd [s] (fd- s))
108172
(implements poll-fd UnixStream.poll-fd)
109173

src/unix_stream.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,49 @@ String UnixStream_peer_MINUS_path(UnixStream* s) {
100100
return str;
101101
}
102102

103+
void UnixStream_close_MINUS_ref(UnixStream* s) {
104+
if (s->fd >= 0) { close(s->fd); s->fd = -1; }
105+
}
106+
107+
void UnixStream_set_MINUS_nonblocking(UnixStream* s) {
108+
int flags = fcntl(s->fd, F_GETFL, 0);
109+
if (flags >= 0) fcntl(s->fd, F_SETFL, flags | O_NONBLOCK);
110+
}
111+
112+
int UnixStream_send_MINUS_len_(UnixStream* s, String* msg, int len) {
113+
if (len < 0 || len > (int)strlen(*msg)) return -1;
114+
return (int)send_all(s->fd, *msg, (size_t)len);
115+
}
116+
117+
int UnixStream_send_MINUS_nb_(UnixStream* s, Array* data, int offset) {
118+
if (offset < 0 || offset >= data->len) return 0;
119+
ssize_t n = send(s->fd, (char*)data->data + offset,
120+
(size_t)(data->len - offset), 0);
121+
if (n >= 0) return (int)n;
122+
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0;
123+
return -1;
124+
}
125+
126+
int UnixStream_read_MINUS_append_MINUS_nb_(UnixStream* s, Array* buf) {
127+
if ((int)(buf->capacity - buf->len) < SOCK_BUF_SIZE) {
128+
int new_cap = (buf->len + SOCK_BUF_SIZE) * 2;
129+
buf->data = CARP_REALLOC(buf->data, new_cap);
130+
buf->capacity = new_cap;
131+
}
132+
ssize_t r = read(s->fd, (char*)buf->data + buf->len, SOCK_BUF_SIZE);
133+
if (r > 0) { buf->len += (int)r; return (int)r; }
134+
if (r == 0) return 0;
135+
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return -2;
136+
return -1;
137+
}
138+
139+
String UnixStream_prn_(UnixStream s) {
140+
size_t len = (size_t)snprintf(NULL, 0, "UnixStream(%d)", s.fd);
141+
String r = CARP_MALLOC(len + 1);
142+
snprintf(r, len + 1, "UnixStream(%d)", s.fd);
143+
return r;
144+
}
145+
103146
UnixStream UnixStream_copy(UnixStream* s) {
104147
UnixStream c;
105148
c.fd = s->fd;

0 commit comments

Comments
 (0)