Skip to content

Commit aaf845d

Browse files
author
carpentry-heartbeat[bot]
committed
Add non-blocking I/O support to UnixStream
Add close!, set-nonblocking, send-nb, read-append-nb, read-blocked, send-len, and prn to UnixStream, mirroring the non-blocking API that TcpStream already provides. Includes C implementations and tests for non-blocking read/write, send-len, and close-by-reference.
1 parent 842c7db commit aaf845d

3 files changed

Lines changed: 215 additions & 2 deletions

File tree

src/unix_stream.carp

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@
6767
(let [n (send-bytes- stream data)]
6868
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
6969

70+
(private send-len-)
71+
(hidden send-len-)
72+
(register send-len-
73+
(Fn [&UnixStream &String Int] Int)
74+
"UnixStream_send_MINUS_len_")
75+
76+
(doc send-len
77+
"sends a string with known length (avoids strlen). Returns bytes sent or an error.")
78+
(defn send-len [stream msg len]
79+
(let [n (send-len- stream msg len)]
80+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
81+
7082
(doc read
7183
"reads up to 4096 bytes from the stream. Returns the data as a string, or an error.
7284
Returns an empty string on connection close.")
@@ -83,7 +95,11 @@ Returns bytes read (0 = connection closed), or an error.")
8395
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
8496

8597
(register close (Fn [UnixStream] ()))
86-
(doc close "closes the stream, releasing the file descriptor.")
98+
(doc close "closes the stream, consuming it.")
99+
100+
(doc close! "closes the stream by reference, for use when the stream lives
101+
in a collection. Sets the fd to -1 to prevent double-close.")
102+
(register close! (Fn [&UnixStream] ()) "UnixStream_close_MINUS_ref")
87103

88104
(doc shutdown "shuts down the stream. 0 = reads, 1 = writes, 2 = both.")
89105
(register shutdown- (Fn [&UnixStream Int] ()) "UnixStream_shutdown_")
@@ -98,12 +114,59 @@ Returns bytes read (0 = connection closed), or an error.")
98114
(doc set-timeout "sets read and write timeouts in seconds.")
99115
(register set-timeout (Fn [&UnixStream Int] ()))
100116

117+
(doc set-nonblocking "puts the socket into non-blocking mode. After this
118+
call, `send-nb` and `read-append-nb` are the appropriate I/O entry points;
119+
the blocking variants will return `EAGAIN` instead of waiting.")
120+
(register set-nonblocking (Fn [&UnixStream] ()))
121+
122+
(private send-nb-)
123+
(hidden send-nb-)
124+
(register send-nb-
125+
(Fn [&UnixStream &(Array Byte) Int] Int)
126+
"UnixStream_send_MINUS_nb_")
127+
128+
(doc send-nb "non-blocking send. Sends as many bytes as the kernel will
129+
accept right now from `data`, starting at `offset`.
130+
131+
Returns `(Result Int String)`. The `Int` is the number of bytes actually
132+
written, which may be `0` if the socket is not currently writable. Re-arm
133+
write interest on the next event-loop iteration in that case.")
134+
(defn send-nb [stream data offset]
135+
(let [n (send-nb- stream data offset)]
136+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
137+
138+
(private read-append-nb-)
139+
(hidden read-append-nb-)
140+
(register read-append-nb-
141+
(Fn [&UnixStream &(Array Byte)] Int)
142+
"UnixStream_read_MINUS_append_MINUS_nb_")
143+
144+
(doc read-blocked "sentinel returned by `read-append-nb` when no data is
145+
currently available on a non-blocking socket.")
146+
(def read-blocked -2)
147+
148+
(doc read-append-nb "non-blocking append-read. Reads whatever the kernel
149+
has buffered into `buf`, growing it as needed.
150+
151+
Returns `(Result Int String)`. The `Int` is one of:
152+
153+
- `> 0` bytes appended to `buf`,
154+
- `0` peer closed cleanly (EOF),
155+
- `read-blocked` (-2) socket has no data right now, retry on the next
156+
readable event.")
157+
(defn read-append-nb [stream buf]
158+
(let [n (read-append-nb- stream buf)]
159+
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))
160+
101161
(doc peer-path "returns the path of the remote peer socket.")
102162
(register peer-path (Fn [&UnixStream] String))
103163

104164
(register copy (Fn [&UnixStream] UnixStream))
105165
(implements copy UnixStream.copy)
106166

167+
(register prn (Fn [UnixStream] String) "UnixStream_prn_")
168+
(implements prn UnixStream.prn)
169+
107170
(defn poll-fd [s] (fd- s))
108171
(implements poll-fd UnixStream.poll-fd)
109172

src/unix_stream.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,48 @@ String UnixStream_peer_MINUS_path(UnixStream* s) {
100100
return str;
101101
}
102102

103+
void UnixStream_close_MINUS_ref(UnixStream* s) {
104+
if (s->fd >= 0) { close(s->fd); s->fd = -1; }
105+
}
106+
107+
void UnixStream_set_MINUS_nonblocking(UnixStream* s) {
108+
int flags = fcntl(s->fd, F_GETFL, 0);
109+
if (flags >= 0) fcntl(s->fd, F_SETFL, flags | O_NONBLOCK);
110+
}
111+
112+
int UnixStream_send_MINUS_len_(UnixStream* s, String* msg, int len) {
113+
return (int)send_all(s->fd, *msg, (size_t)len);
114+
}
115+
116+
int UnixStream_send_MINUS_nb_(UnixStream* s, Array* data, int offset) {
117+
if (offset >= data->len) return 0;
118+
ssize_t n = send(s->fd, (char*)data->data + offset,
119+
(size_t)(data->len - offset), 0);
120+
if (n >= 0) return (int)n;
121+
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0;
122+
return -1;
123+
}
124+
125+
int UnixStream_read_MINUS_append_MINUS_nb_(UnixStream* s, Array* buf) {
126+
if ((int)(buf->capacity - buf->len) < SOCK_BUF_SIZE) {
127+
int new_cap = (buf->len + SOCK_BUF_SIZE) * 2;
128+
buf->data = CARP_REALLOC(buf->data, new_cap);
129+
buf->capacity = new_cap;
130+
}
131+
ssize_t r = read(s->fd, (char*)buf->data + buf->len, SOCK_BUF_SIZE);
132+
if (r > 0) { buf->len += (int)r; return (int)r; }
133+
if (r == 0) return 0;
134+
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return -2;
135+
return -1;
136+
}
137+
138+
String UnixStream_prn_(UnixStream s) {
139+
size_t len = (size_t)snprintf(NULL, 0, "UnixStream(%d)", s.fd);
140+
String r = CARP_MALLOC(len + 1);
141+
snprintf(r, len + 1, "UnixStream(%d)", s.fd);
142+
return r;
143+
}
144+
103145
UnixStream UnixStream_copy(UnixStream* s) {
104146
UnixStream c;
105147
c.fd = s->fd;

test/unix_test.carp

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,104 @@
3333
(UnixListener.close listener)
3434
false))))))))
3535

36+
(defn unix-nb-roundtrip []
37+
(let [path @"/tmp/carp_test_unix_nb.sock"]
38+
(match (UnixListener.bind &path)
39+
(Result.Error _) false
40+
(Result.Success listener)
41+
(match (UnixStream.connect &path)
42+
(Result.Error _) (do (UnixListener.close listener) false)
43+
(Result.Success client)
44+
(match (UnixListener.accept &listener)
45+
(Result.Error _)
46+
(do
47+
(UnixStream.close client)
48+
(UnixListener.close listener)
49+
false)
50+
(Result.Success conn)
51+
(let-do [buf (the (Array Byte) (Array.allocate 1024))
52+
payload (the (Array Byte)
53+
[(Char.to-byte \h) (Char.to-byte \i)])
54+
ok true]
55+
(UnixStream.set-nonblocking &client)
56+
; nothing to read yet -> read-blocked sentinel
57+
(match (UnixStream.read-append-nb &client &buf)
58+
(Result.Success n)
59+
(when (/= n UnixStream.read-blocked) (set! ok false))
60+
(Result.Error _) (set! ok false))
61+
; peer writes, we should be able to read it
62+
(ignore (UnixStream.send &conn "ping"))
63+
(System.sleep-micros 50000)
64+
(match (UnixStream.read-append-nb &client &buf)
65+
(Result.Success n) (when (/= n 4) (set! ok false))
66+
(Result.Error _) (set! ok false))
67+
; non-blocking send returns bytes written
68+
(match (UnixStream.send-nb &client &payload 0)
69+
(Result.Success n) (when (/= n 2) (set! ok false))
70+
(Result.Error _) (set! ok false))
71+
(UnixStream.close client)
72+
(UnixStream.close conn)
73+
(UnixListener.close listener)
74+
ok))))))
75+
76+
(defn unix-send-len-test []
77+
(let [path @"/tmp/carp_test_unix_sendlen.sock"]
78+
(match (UnixListener.bind &path)
79+
(Result.Error _) false
80+
(Result.Success listener)
81+
(match (UnixStream.connect &path)
82+
(Result.Error _) (do (UnixListener.close listener) false)
83+
(Result.Success client)
84+
(match (UnixListener.accept &listener)
85+
(Result.Error _)
86+
(do
87+
(UnixStream.close client)
88+
(UnixListener.close listener)
89+
false)
90+
(Result.Success conn)
91+
(do
92+
; send only first 3 bytes of "hello"
93+
(ignore (UnixStream.send-len &client "hello" 3))
94+
(match (the (Result String String) (UnixStream.read &conn))
95+
(Result.Success msg)
96+
(let-do [ok (= &msg "hel")]
97+
(UnixStream.close conn)
98+
(UnixStream.close client)
99+
(UnixListener.close listener)
100+
ok)
101+
_
102+
(do
103+
(UnixStream.close conn)
104+
(UnixStream.close client)
105+
(UnixListener.close listener)
106+
false))))))))
107+
108+
(defn unix-close-ref-test []
109+
(let [path @"/tmp/carp_test_unix_closeref.sock"]
110+
(match (UnixListener.bind &path)
111+
(Result.Error _) false
112+
(Result.Success listener)
113+
(match (UnixStream.connect &path)
114+
(Result.Error _) (do (UnixListener.close listener) false)
115+
(Result.Success client)
116+
(match (UnixListener.accept &listener)
117+
(Result.Error _)
118+
(do
119+
(UnixStream.close client)
120+
(UnixListener.close listener)
121+
false)
122+
(Result.Success conn)
123+
(do
124+
; close! should work on a reference
125+
(UnixStream.close! &conn)
126+
; the stream is now closed; send should fail
127+
(let-do [ok (match (UnixStream.send &conn "test")
128+
(Result.Error _) true
129+
_ false)]
130+
(UnixStream.close client)
131+
(UnixListener.close listener)
132+
ok)))))))
133+
36134
(deftest test
37135
(assert-true test
38136
(match (UnixListener.bind "/tmp/carp_test_unix2.sock")
@@ -46,4 +144,14 @@
46144
_ false)
47145
"UnixStream.connect fails on nonexistent path")
48146

49-
(assert-true test (unix-roundtrip) "Unix socket roundtrip: send and receive"))
147+
(assert-true test (unix-roundtrip) "Unix socket roundtrip: send and receive")
148+
149+
(assert-true test
150+
(unix-nb-roundtrip)
151+
"non-blocking I/O: read-blocked, then read 4 bytes, then send 2 bytes")
152+
153+
(assert-true test
154+
(unix-send-len-test)
155+
"send-len sends only the specified number of bytes")
156+
157+
(assert-true test (unix-close-ref-test) "close! closes stream by reference"))

0 commit comments

Comments
 (0)