Skip to content

Commit da62876

Browse files
authored
Merge pull request #2 from carpentry-org/claude/unix-stream-nonblocking
Add non-blocking I/O support to UnixStream
2 parents 842c7db + e4c4536 commit da62876

3 files changed

Lines changed: 225 additions & 2 deletions

File tree

src/unix_stream.carp

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,19 @@
6767
(let [n (send-bytes- stream data)]
6868
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
6969

70+
(private send-len-)
71+
(hidden send-len-)
72+
(register send-len-
73+
(Fn [&UnixStream &String Int] Int)
74+
"UnixStream_send_MINUS_len_")
75+
76+
(doc send-len "sends a string with known length (avoids strlen). `len` must be
77+
between 0 and the string’s length; returns an error otherwise.
78+
Returns bytes sent or an error.")
79+
(defn send-len [stream msg len]
80+
(let [n (send-len- stream msg len)]
81+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
82+
7083
(doc read
7184
"reads up to 4096 bytes from the stream. Returns the data as a string, or an error.
7285
Returns an empty string on connection close.")
@@ -83,7 +96,11 @@ Returns bytes read (0 = connection closed), or an error.")
8396
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
8497

8598
(register close (Fn [UnixStream] ()))
86-
(doc close "closes the stream, releasing the file descriptor.")
99+
(doc close "closes the stream, consuming it.")
100+
101+
(doc close! "closes the stream by reference, for use when the stream lives
102+
in a collection. Sets the fd to -1 to prevent double-close.")
103+
(register close! (Fn [&UnixStream] ()) "UnixStream_close_MINUS_ref")
87104

88105
(doc shutdown "shuts down the stream. 0 = reads, 1 = writes, 2 = both.")
89106
(register shutdown- (Fn [&UnixStream Int] ()) "UnixStream_shutdown_")
@@ -98,12 +115,59 @@ Returns bytes read (0 = connection closed), or an error.")
98115
(doc set-timeout "sets read and write timeouts in seconds.")
99116
(register set-timeout (Fn [&UnixStream Int] ()))
100117

118+
(doc set-nonblocking "puts the socket into non-blocking mode. After this
119+
call, `send-nb` and `read-append-nb` are the appropriate I/O entry points;
120+
the blocking variants will return `EAGAIN` instead of waiting.")
121+
(register set-nonblocking (Fn [&UnixStream] ()))
122+
123+
(private send-nb-)
124+
(hidden send-nb-)
125+
(register send-nb-
126+
(Fn [&UnixStream &(Array Byte) Int] Int)
127+
"UnixStream_send_MINUS_nb_")
128+
129+
(doc send-nb "non-blocking send. Sends as many bytes as the kernel will
130+
accept right now from `data`, starting at `offset`.
131+
132+
Returns `(Result Int String)`. The `Int` is the number of bytes actually
133+
written, which may be `0` if the socket is not currently writable. Re-arm
134+
write interest on the next event-loop iteration in that case.")
135+
(defn send-nb [stream data offset]
136+
(let [n (send-nb- stream data offset)]
137+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
138+
139+
(private read-append-nb-)
140+
(hidden read-append-nb-)
141+
(register read-append-nb-
142+
(Fn [&UnixStream &(Array Byte)] Int)
143+
"UnixStream_read_MINUS_append_MINUS_nb_")
144+
145+
(doc read-blocked "sentinel returned by `read-append-nb` when no data is
146+
currently available on a non-blocking socket.")
147+
(def read-blocked -2)
148+
149+
(doc read-append-nb "non-blocking append-read. Reads whatever the kernel
150+
has buffered into `buf`, growing it as needed.
151+
152+
Returns `(Result Int String)`. The `Int` is one of:
153+
154+
- `> 0` bytes appended to `buf`,
155+
- `0` peer closed cleanly (EOF),
156+
- `read-blocked` (-2) socket has no data right now, retry on the next
157+
readable event.")
158+
(defn read-append-nb [stream buf]
159+
(let [n (read-append-nb- stream buf)]
160+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
161+
101162
(doc peer-path "returns the path of the remote peer socket.")
102163
(register peer-path (Fn [&UnixStream] String))
103164

104165
(register copy (Fn [&UnixStream] UnixStream))
105166
(implements copy UnixStream.copy)
106167

168+
(register prn (Fn [UnixStream] String) "UnixStream_prn_")
169+
(implements prn UnixStream.prn)
170+
107171
(defn poll-fd [s] (fd- s))
108172
(implements poll-fd UnixStream.poll-fd)
109173

src/unix_stream.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,49 @@ String UnixStream_peer_MINUS_path(UnixStream* s) {
100100
return str;
101101
}
102102

103+
void UnixStream_close_MINUS_ref(UnixStream* s) {
104+
if (s->fd >= 0) { close(s->fd); s->fd = -1; }
105+
}
106+
107+
void UnixStream_set_MINUS_nonblocking(UnixStream* s) {
108+
int flags = fcntl(s->fd, F_GETFL, 0);
109+
if (flags >= 0) fcntl(s->fd, F_SETFL, flags | O_NONBLOCK);
110+
}
111+
112+
int UnixStream_send_MINUS_len_(UnixStream* s, String* msg, int len) {
113+
if (len < 0 || len > (int)strlen(*msg)) return -1;
114+
return (int)send_all(s->fd, *msg, (size_t)len);
115+
}
116+
117+
int UnixStream_send_MINUS_nb_(UnixStream* s, Array* data, int offset) {
118+
if (offset < 0 || offset >= data->len) return 0;
119+
ssize_t n = send(s->fd, (char*)data->data + offset,
120+
(size_t)(data->len - offset), 0);
121+
if (n >= 0) return (int)n;
122+
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0;
123+
return -1;
124+
}
125+
126+
int UnixStream_read_MINUS_append_MINUS_nb_(UnixStream* s, Array* buf) {
127+
if ((int)(buf->capacity - buf->len) < SOCK_BUF_SIZE) {
128+
int new_cap = (buf->len + SOCK_BUF_SIZE) * 2;
129+
buf->data = CARP_REALLOC(buf->data, new_cap);
130+
buf->capacity = new_cap;
131+
}
132+
ssize_t r = read(s->fd, (char*)buf->data + buf->len, SOCK_BUF_SIZE);
133+
if (r > 0) { buf->len += (int)r; return (int)r; }
134+
if (r == 0) return 0;
135+
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return -2;
136+
return -1;
137+
}
138+
139+
String UnixStream_prn_(UnixStream s) {
140+
size_t len = (size_t)snprintf(NULL, 0, "UnixStream(%d)", s.fd);
141+
String r = CARP_MALLOC(len + 1);
142+
snprintf(r, len + 1, "UnixStream(%d)", s.fd);
143+
return r;
144+
}
145+
103146
UnixStream UnixStream_copy(UnixStream* s) {
104147
UnixStream c;
105148
c.fd = s->fd;

test/unix_test.carp

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,112 @@
3333
(UnixListener.close listener)
3434
false))))))))
3535

36+
(defn unix-nb-roundtrip []
37+
(let [path @"/tmp/carp_test_unix_nb.sock"]
38+
(match (UnixListener.bind &path)
39+
(Result.Error _) false
40+
(Result.Success listener)
41+
(match (UnixStream.connect &path)
42+
(Result.Error _) (do (UnixListener.close listener) false)
43+
(Result.Success client)
44+
(match (UnixListener.accept &listener)
45+
(Result.Error _)
46+
(do
47+
(UnixStream.close client)
48+
(UnixListener.close listener)
49+
false)
50+
(Result.Success conn)
51+
(let-do [buf (the (Array Byte) (Array.allocate 1024))
52+
payload (the (Array Byte)
53+
[(Char.to-byte \h) (Char.to-byte \i)])
54+
ok true]
55+
(UnixStream.set-nonblocking &client)
56+
; nothing to read yet -> read-blocked sentinel
57+
(match (UnixStream.read-append-nb &client &buf)
58+
(Result.Success n)
59+
(when (/= n UnixStream.read-blocked) (set! ok false))
60+
(Result.Error _) (set! ok false))
61+
; peer writes, we should be able to read it
62+
(ignore (UnixStream.send &conn "ping"))
63+
(System.sleep-micros 50000)
64+
(match (UnixStream.read-append-nb &client &buf)
65+
(Result.Success n) (when (/= n 4) (set! ok false))
66+
(Result.Error _) (set! ok false))
67+
; non-blocking send returns bytes written
68+
(match (UnixStream.send-nb &client &payload 0)
69+
(Result.Success n) (when (/= n 2) (set! ok false))
70+
(Result.Error _) (set! ok false))
71+
; send-nb with offset skips leading bytes
72+
(match (UnixStream.send-nb &client &payload 1)
73+
(Result.Success n) (when (/= n 1) (set! ok false))
74+
(Result.Error _) (set! ok false))
75+
; send-nb with negative offset returns 0 (guard)
76+
(match (UnixStream.send-nb &client &payload -1)
77+
(Result.Success n) (when (/= n 0) (set! ok false))
78+
(Result.Error _) (set! ok false))
79+
(UnixStream.close client)
80+
(UnixStream.close conn)
81+
(UnixListener.close listener)
82+
ok))))))
83+
84+
(defn unix-send-len-test []
85+
(let [path @"/tmp/carp_test_unix_sendlen.sock"]
86+
(match (UnixListener.bind &path)
87+
(Result.Error _) false
88+
(Result.Success listener)
89+
(match (UnixStream.connect &path)
90+
(Result.Error _) (do (UnixListener.close listener) false)
91+
(Result.Success client)
92+
(match (UnixListener.accept &listener)
93+
(Result.Error _)
94+
(do
95+
(UnixStream.close client)
96+
(UnixListener.close listener)
97+
false)
98+
(Result.Success conn)
99+
(do
100+
; send only first 3 bytes of "hello"
101+
(ignore (UnixStream.send-len &client "hello" 3))
102+
(match (the (Result String String) (UnixStream.read &conn))
103+
(Result.Success msg)
104+
(let-do [ok (= &msg "hel")]
105+
(UnixStream.close conn)
106+
(UnixStream.close client)
107+
(UnixListener.close listener)
108+
ok)
109+
_
110+
(do
111+
(UnixStream.close conn)
112+
(UnixStream.close client)
113+
(UnixListener.close listener)
114+
false))))))))
115+
116+
(defn unix-close-ref-test []
117+
(let [path @"/tmp/carp_test_unix_closeref.sock"]
118+
(match (UnixListener.bind &path)
119+
(Result.Error _) false
120+
(Result.Success listener)
121+
(match (UnixStream.connect &path)
122+
(Result.Error _) (do (UnixListener.close listener) false)
123+
(Result.Success client)
124+
(match (UnixListener.accept &listener)
125+
(Result.Error _)
126+
(do
127+
(UnixStream.close client)
128+
(UnixListener.close listener)
129+
false)
130+
(Result.Success conn)
131+
(do
132+
; close! should work on a reference
133+
(UnixStream.close! &conn)
134+
; the stream is now closed; send should fail
135+
(let-do [ok (match (UnixStream.send &conn "test")
136+
(Result.Error _) true
137+
_ false)]
138+
(UnixStream.close client)
139+
(UnixListener.close listener)
140+
ok)))))))
141+
36142
(deftest test
37143
(assert-true test
38144
(match (UnixListener.bind "/tmp/carp_test_unix2.sock")
@@ -46,4 +152,14 @@
46152
_ false)
47153
"UnixStream.connect fails on nonexistent path")
48154

49-
(assert-true test (unix-roundtrip) "Unix socket roundtrip: send and receive"))
155+
(assert-true test (unix-roundtrip) "Unix socket roundtrip: send and receive")
156+
157+
(assert-true test
158+
(unix-nb-roundtrip)
159+
"non-blocking I/O: read-blocked, then read 4 bytes, then send 2 bytes")
160+
161+
(assert-true test
162+
(unix-send-len-test)
163+
"send-len sends only the specified number of bytes")
164+
165+
(assert-true test (unix-close-ref-test) "close! closes stream by reference"))

0 commit comments

Comments
 (0)