-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathipc.c
More file actions
1446 lines (1342 loc) · 51.8 KB
/
ipc.c
File metadata and controls
1446 lines (1342 loc) · 51.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* ipc.c - IPC over TCP.
*
* Wire format (little-endian only for v1):
*
* off 0 arch u8 1 = little-endian; anything else -> close conn
* off 1 msgtype u8 0=async, 1=sync request, 2=sync response,
* 3=sync error
* off 2 version u8 1; future-proofing for protocol changes
* off 3 reserved u8 must be 0
* off 4 msglen u32 total message length INCLUDING this 8-byte header
* off 8 payload ... bd_-serialized K value; (msglen - 8) of them
*
* SYNC_ERR payloads are bd_-serialized char vectors (the error string),
* so receivers can uniformly db_ everything and only special-case the
* msgtype, not the encoding.
*
* Connections are persistent (q-style): the fd returned to k-space is the
* real socket fd; many messages can flow over it; 3:w closes it.
*
* Portability:
* The same body of code below compiles on POSIX and Win32. Platform
* differences are isolated to the small shim block right under the
* includes (sock_*, ipc_init, ipc_shutdown). On Windows the server
* half is reachable, but until repl.c grows a Win32-aware poll loop
* nothing will service incoming connections during the prompt; client
* 3:/4: calls work because they drive the shim's poll directly.
*/
#include "ipc.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include "k.core/types.h"
#include "k.core/x.h"
#include "k.core/rand.h"
#include "k.h"
#include "scope.h"
#include "fn.h"
#include "p.h"
#include "b.h"
#include "tmr.h"
/* ===== platform shim ===================================================== */
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#ifdef _MSC_VER
#pragma comment(lib, "Ws2_32.lib")
typedef int ssize_t; /* recv/send return int; not in MSVC headers */
#endif
typedef SOCKET sock_t;
#define INVALID_SOCK INVALID_SOCKET
#define sock_close(fd) closesocket(fd)
#define sock_recv(fd, buf, n) recv((fd), (char*)(buf), (int)(n), 0)
#define sock_send(fd, buf, n) send((fd), (const char*)(buf), (int)(n), 0)
#define sock_lasterr() WSAGetLastError()
#define sock_would_block(e) ((e) == WSAEWOULDBLOCK)
#define sock_intr(e) (0) /* not raised on winsock */
#define sock_poll(p, n, t) WSAPoll((p), (n), (t))
static int sock_setnb(sock_t fd) {
u_long nb = 1;
return ioctlsocket(fd, FIONBIO, &nb) == 0 ? 0 : -1;
}
/* FormatMessage into a thread-local buffer; trim trailing CRLF. */
static const char *sock_errstr(int err) {
static __declspec(thread) char buf[256];
DWORD n = FormatMessageA(
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, (DWORD)err, 0, buf, sizeof buf, NULL);
while(n && (buf[n-1] == '\n' || buf[n-1] == '\r' || buf[n-1] == ' '))
buf[--n] = 0;
if(n == 0) snprintf(buf, sizeof buf, "winsock error %d", err);
return buf;
}
#else
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/tcp.h> /* TCP_NODELAY */
#include <arpa/inet.h>
#include <netdb.h>
#include <errno.h>
#include <signal.h> /* SIGCHLD reaping in fork mode */
#include <sys/wait.h> /* waitpid() in our reaper */
#include <sys/resource.h> /* getrlimit() in close_inherited_fds() */
#include <dirent.h> /* opendir() in close_inherited_fds() */
#include <ctype.h> /* isdigit() in close_inherited_fds() */
typedef int sock_t;
#define INVALID_SOCK (-1)
#define sock_close(fd) close(fd)
#define sock_recv(fd, buf, n) recv((fd), (buf), (n), 0)
#define sock_send(fd, buf, n) send((fd), (buf), (n), 0)
#define sock_lasterr() errno
#define sock_would_block(e) ((e) == EAGAIN || (e) == EWOULDBLOCK)
#define sock_intr(e) ((e) == EINTR)
#define sock_poll(p, n, t) poll((p), (n), (t))
#define sock_errstr(e) strerror(e)
static int sock_setnb(int fd) {
int fl = fcntl(fd, F_GETFL, 0);
if(fl < 0) return -1;
return fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0 ? -1 : 0;
}
#endif
/* Disable Nagle on a connected data socket. With Nagle + delayed-ACK on the
* other end, small request/response pairs stall by ~40ms per round trip
* (observed: 100 sync callbacks from a fork child over a single dedup'd
* conn ran at ~13/s instead of thousands/s). IPC traffic is interactive
* and frame-oriented, so Nagle buys nothing and hurts throughput badly.
* Fire-and-forget: a setsockopt failure is not fatal, the conn still
* works, just slower. Only call on DATA sockets (accepted/connected),
* never on listeners. */
static void sock_nodelay(sock_t fd) {
int one = 1;
(void)setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
(const char*)&one, sizeof one);
}
int ipc_init(void) {
#ifdef _WIN32
WSADATA wsa;
return WSAStartup(MAKEWORD(2,2), &wsa) == 0 ? 0 : -1;
#else
return 0;
#endif
}
/* ipc_shutdown is defined near the bottom of this file (after the static
* conn table, send_scratch, and close_conn_at it needs to touch). */
/* Connection-table sizing (used by both the multiplexer and the impl). */
#define MAX_CONNS 256
#define POLL_BATCH (1 + MAX_CONNS) /* listen + all conns */
/* Hoisted up so the win32 multiplexer can distinguish the listen sockets
* (which want FD_ACCEPT) from accepted/client conns (which want FD_READ
* + FD_CLOSE). The full impl that mutates these lives further down.
*
* Two independent slots so `\m i N` and `\m f M` can both be active at
* once (matching k's behavior). Each slot is rebound by `\m i/f PORT`
* and torn down by `\m i/f 0`. */
static int listen_iter_fd = -1;
static int listen_iter_port = 0;
static int listen_fork_fd = -1;
static int listen_fork_port = 0;
#ifndef _WIN32
/* SIGCHLD handler used in fork mode. Reaps any number of exited children
* non-blockingly; safe to leave installed even if no children are pending.
* Preserves errno so it won't poison whatever syscall it interrupted. */
static void sigchld_reap(int sig) {
(void)sig;
int saved = errno;
while(waitpid(-1, NULL, WNOHANG) > 0) {}
errno = saved;
}
/* Close every fd we inherited from the parent except 0/1/2 and `keep`.
*
* Why: fork can fire mid-script-load (the load() loop in repl.c holds an
* open FILE* on the script, and pgreduce can call w 4:msg which parks
* ipc_send_sync in poll, which calls handle_accept). The child inherits
* that FILE*'s fd; if the child ever read/wrote it, the shared kernel
* offset would corrupt the parent's parse. We don't do that today, but
* also: leaving the fd open is a leak and a footgun for future code.
*
* Also covers any other live FILE* from io.c (`0:`, `2:`, etc.) that
* happens to be open at fork time.
*
* Strategy: prefer /proc/self/fd (Linux) or /dev/fd (macOS) for an exact
* list of currently open fds. Fall back to a getrlimit-bounded loop. */
static void close_inherited_fds(int keep) {
const char *paths[] = { "/proc/self/fd", "/dev/fd", NULL };
for(int p = 0; paths[p]; p++) {
DIR *d = opendir(paths[p]);
if(!d) continue;
int dfd = dirfd(d);
/* Snapshot fds first so we don't perturb the iteration by closing. */
int buf[256];
int bn = 0;
struct dirent *e;
while((e = readdir(d))) {
if(!isdigit((unsigned char)e->d_name[0])) continue;
int fd = atoi(e->d_name);
if(fd <= 2 || fd == keep || fd == dfd) continue;
if(bn < (int)(sizeof buf / sizeof buf[0])) buf[bn++] = fd;
}
closedir(d);
for(int i = 0; i < bn; i++) (void)close(buf[i]);
return;
}
/* Fallback: brute-force range. */
struct rlimit rl;
int max = 1024;
if(getrlimit(RLIMIT_NOFILE, &rl) == 0 && rl.rlim_cur != RLIM_INFINITY)
max = (int)rl.rlim_cur;
for(int fd = 3; fd < max; fd++)
if(fd != keep) (void)close(fd);
}
#endif
/* ===== Windows stdin multiplexer ========================================
*
* Goal: let the repl block on stdin AND on IPC sockets at the same time,
* the way poll() does on POSIX. WSAPoll only works on sockets - and any
* loopback bind+listen "self-pipe" pair triggers the Windows Firewall
* "allow public/private network access" prompt at startup, which is
* unfriendly when the user hasn't asked to listen on anything yet.
*
* So we use a pure Win32 setup with NO sockets until the user explicitly
* runs \m i PORT or passes -i PORT:
*
* reader thread repl_getc / WaitForMultipleObjects
* | |
* ReadFile(stdin) wait set: [stdin_event,
* | WSAEvent(listen_iter_fd),
* ring_push(bytes) WSAEvent(conn_fd)...]
* | |
* SetEvent(stdin_event) stdin_event fires -> drain ring;
* socket events -> WSAEnumNetworkEvents,
* translate to revents, ipc_handle_pollfd.
*
* stdin_event is a manual-reset Win32 Event:
* - reader thread sets it whenever bytes are pushed or EOF is hit;
* - ring_pop resets it once it's drained the ring AND eof isn't set.
* The IPC socket events are created/torn down per wait iteration with
* WSAEventSelect, which is slightly wasteful but keeps the code small
* and avoids per-conn lifecycle bookkeeping. */
#ifdef _WIN32
#define STDIN_WAIT_BUDGET (MAXIMUM_WAIT_OBJECTS - 1) /* leave slot for stdin */
static HANDLE stdin_event = NULL; /* manual-reset */
static HANDLE reader_th = NULL;
static CRITICAL_SECTION ring_cs;
static unsigned char *ring = NULL;
static size_t ring_cap = 0;
static size_t ring_head = 0; /* next byte to read */
static size_t ring_tail = 0; /* one past last byte */
static int stdin_eof = 0;
/* Recompute stdin_event state from ring contents. Caller holds ring_cs. */
static void update_stdin_event_locked(void) {
if(ring_head < ring_tail || stdin_eof) SetEvent(stdin_event);
else ResetEvent(stdin_event);
}
static void ring_push(const unsigned char *p, size_t n) {
EnterCriticalSection(&ring_cs);
/* compact if there's slack at the front */
if(ring_head > 0) {
memmove(ring, ring + ring_head, ring_tail - ring_head);
ring_tail -= ring_head;
ring_head = 0;
}
if(ring_tail + n > ring_cap) {
size_t newcap = ring_cap ? ring_cap * 2 : 4096;
while(newcap < ring_tail + n) newcap *= 2;
ring = xrealloc(ring, newcap);
ring_cap = newcap;
}
memcpy(ring + ring_tail, p, n);
ring_tail += n;
update_stdin_event_locked();
LeaveCriticalSection(&ring_cs);
}
/* 1=byte returned, 0=empty (caller must wait), -1=eof and empty */
static int ring_pop(unsigned char *out) {
int rc;
EnterCriticalSection(&ring_cs);
if(ring_head < ring_tail) { *out = ring[ring_head++]; rc = 1; }
else if(stdin_eof) { rc = -1; }
else { rc = 0; }
update_stdin_event_locked();
LeaveCriticalSection(&ring_cs);
return rc;
}
static DWORD WINAPI reader_main(LPVOID arg) {
(void)arg;
HANDLE hin = GetStdHandle(STD_INPUT_HANDLE);
unsigned char buf[4096];
for(;;) {
DWORD nread = 0;
BOOL ok = ReadFile(hin, buf, sizeof buf, &nread, NULL);
if(!ok || nread == 0) {
EnterCriticalSection(&ring_cs);
stdin_eof = 1;
update_stdin_event_locked();
LeaveCriticalSection(&ring_cs);
return 0;
}
ring_push(buf, (size_t)nread);
}
}
int ipc_stdin_init(void) {
InitializeCriticalSection(&ring_cs);
stdin_event = CreateEvent(NULL, TRUE /*manual reset*/, FALSE, NULL);
if(!stdin_event) return -1;
reader_th = CreateThread(NULL, 0, reader_main, NULL, 0, NULL);
return reader_th ? 0 : -1;
}
/* Snapshot the currently pollable IPC sockets and create a WSAEvent for
* each. socks[i] is the SOCKET; events[i] is the parallel Win32 event
* handle (which is what WaitForMultipleObjects waits on). Capped at
* STDIN_WAIT_BUDGET because WaitForMultipleObjects tops out at 64 handles
* total; if a user ever has >63 active conns at once, the older ones will
* still drain but with a slight latency. */
static int build_socket_events(WSAEVENT *events, sock_t *socks) {
struct pollfd tmp[POLL_BATCH];
int n = ipc_extra_pollfds(tmp, POLL_BATCH);
if(n > STDIN_WAIT_BUDGET) n = STDIN_WAIT_BUDGET;
for(int i = 0; i < n; i++) {
events[i] = WSACreateEvent();
socks[i] = (sock_t)tmp[i].fd;
long mask = FD_READ | FD_CLOSE;
if((int)socks[i] == listen_iter_fd ||
(int)socks[i] == listen_fork_fd) mask |= FD_ACCEPT;
/* WSAEventSelect implicitly sets the socket non-blocking, which is
* already true for ours. It does NOT auto-restore blocking on detach,
* so the un-WSAEventSelect at teardown leaves them as-is. */
WSAEventSelect(socks[i], events[i], mask);
}
return n;
}
static void teardown_socket_events(WSAEVENT *events, sock_t *socks, int n) {
for(int i = 0; i < n; i++) {
WSAEventSelect(socks[i], NULL, 0);
WSACloseEvent(events[i]);
}
}
int ipc_stdin_getc(void) {
unsigned char c;
for(;;) {
int r = ring_pop(&c);
if(r == 1) return (int)c;
if(r == -1) return -1;
HANDLE handles[MAXIMUM_WAIT_OBJECTS];
sock_t socks [STDIN_WAIT_BUDGET];
handles[0] = stdin_event;
int n_socks = build_socket_events((WSAEVENT*)(handles + 1), socks);
int tms = tmr_timeout_ms();
DWORD wms = (tms < 0) ? INFINITE : (DWORD)tms;
DWORD wr = WaitForMultipleObjects((DWORD)(1 + n_socks), handles,
FALSE, wms);
if(wr == WAIT_TIMEOUT) {
teardown_socket_events((WSAEVENT*)(handles + 1), socks, n_socks);
tmr_maybe_fire();
continue;
}
if(wr >= WAIT_OBJECT_0 && wr < WAIT_OBJECT_0 + (DWORD)(1 + n_socks)) {
DWORD idx = wr - WAIT_OBJECT_0;
if(idx == 0) {
/* stdin event - ring_pop will service it on the next iteration */
} else {
int si = (int)(idx - 1);
WSANETWORKEVENTS ev;
if(WSAEnumNetworkEvents(socks[si], handles[idx], &ev) == 0) {
short rev = 0;
if(ev.lNetworkEvents & (FD_READ|FD_ACCEPT)) rev |= POLLIN;
if(ev.lNetworkEvents & FD_CLOSE) rev |= POLLHUP;
if(rev) ipc_handle_pollfd((int)socks[si], rev);
}
}
teardown_socket_events((WSAEVENT*)(handles + 1), socks, n_socks);
tmr_maybe_fire();
} else {
teardown_socket_events((WSAEVENT*)(handles + 1), socks, n_socks);
return -1;
}
}
}
#else
int ipc_stdin_init(void) { return 0; }
/* ipc_stdin_getc is unused on POSIX (repl uses poll_getc directly). */
#endif
/* ===== implementation (single body, both platforms) ====================== */
#define IPC_HDR_SIZE 8
#define IPC_VERSION 1
#define IPC_ARCH_LE 1
/* Hard cap per message (header + payload). The real ceiling is set by:
* - our wire framing: body_need is i32; >2^31 wraps negative
* - K's public count APIs (tn/tnv/kresize) take i32, so a single K
* vector tops out at ~2.1G elements anyway
* Senders that exceed this raise 'length; receivers that see a larger
* msglen drop the conn (only reachable from a non-conforming peer at
* this point). */
#define IPC_MAX_MSG ((u32)0x7FFFFFFF) /* INT32_MAX */
#define MSG_ASYNC 0
#define MSG_SYNC_REQ 1
#define MSG_SYNC_RSP 2
#define MSG_SYNC_ERR 3
/* Per-connection receive state.
*
* The state machine runs in two phases:
* 1. accumulating header (hdr_have < IPC_HDR_SIZE)
* 2. accumulating body (body != NULL && body_have < body_need)
* On completion we hand the message to deliver_message() and reset
* for the next message.
*
* is_client is set on conns we opened via ipc_open(); cleared on conns
* we accept()ed. waiting_sync/sync_have/sync_response/sync_is_err form
* the rendezvous slot for an in-progress ipc_send_sync(): when a
* SYNC_RSP/SYNC_ERR arrives on a conn whose waiting_sync is set, the
* dispatcher drops the result into the slot rather than logging it. */
typedef struct {
int fd;
/* receive */
u8 hdr[IPC_HDR_SIZE];
i32 hdr_have;
u8 *body; /* xmalloc'd; persists across messages until close */
i32 body_cap; /* current backing allocation; >= body_need on a parsed header */
i32 body_have;
i32 body_need; /* bytes of bd payload expected (msglen - 8) */
u8 msgtype; /* parsed from header; valid once body!=NULL */
/* role / sync rendezvous */
u8 is_client; /* 1 if we opened it (vs accepted) */
u8 waiting_sync; /* 1 while ipc_send_sync is parked on this fd */
u8 sync_have; /* 1 once sync_response is filled */
u8 sync_is_err; /* 1 if it arrived as MSG_SYNC_ERR */
u8 in_dispatch; /* 1 while a handler (.m.s/.m.g/.m.c) runs for this
conn. nested sync waits exclude this fd from
their poll set so more data on the same conn
queues in the kernel instead of re-entering
dispatch. */
K sync_response; /* deserialized payload; valid iff sync_have */
/* client-only: dedup key. matches q's behavior of returning the same
* handle for repeated 3:(host;port) on the same target rather than
* burning a new socket each time. */
char host[256]; /* normalized host string, valid iff is_client */
i32 port; /* tcp port, valid iff is_client */
} ipc_conn;
/* listen_{iter,fork}_{fd,port} are defined above the win32 multiplexer block. */
static ipc_conn conns[MAX_CONNS];
static int conn_count = 0;
/* Report the bound port for one slot, 0 if inactive. The two slots
* are independent; callers that want to summarize both query each. */
int ipc_listen_port_for(int mode) {
if(mode == IPC_MODE_FORK) return listen_fork_fd >= 0 ? listen_fork_port : 0;
return listen_iter_fd >= 0 ? listen_iter_port : 0;
}
/* Fuzz builds short-circuit every IPC entry point that touches the
* outside world (sockets, DNS, fork, blocking reads). Rationale:
* - bind/listen on random ports collides across forks and leaks fds
* - getaddrinfo on arbitrary hostnames can stall on DNS
* - connect to localhost may hit unrelated services (ssh, postgres, ...)
* - ipc_send_sync would block indefinitely waiting for a reply that
* never arrives, hanging the fuzzer
* ipc_init_ns is intentionally NOT stubbed: it only mutates K state
* (.m.{s,g,c}) and gives the fuzzer something to exercise. */
/* ---- connection table helpers ---- */
static int find_conn(int fd) {
for(int i = 0; i < conn_count; i++) if(conns[i].fd == fd) return i;
return -1;
}
/* Reset for the next message. Keeps c->body / c->body_cap so subsequent
* messages on the same conn don't pay the alloc + page-fault cost again. */
static void reset_recv(ipc_conn *c) {
c->hdr_have = 0;
c->body_have = 0;
c->body_need = 0;
c->msgtype = 0;
}
/* Forward decl for the close callback. */
static void fire_close_handler(int fd);
/* Forward decl: wrap a strerror as a kerror with a leading prefix
* (e.g. "bind"). Defined later, near the client open/close path. */
static K cerror(const char *prefix, int err);
static void close_conn_at(int idx) {
ipc_conn *c = &conns[idx];
int fd = c->fd;
/* If a sync caller is parked on this fd, hand them an error so they
* don't block forever. We set sync_have here; ipc_send_sync notices
* the conn vanished and synthesizes a 'conn' error. */
u8 was_waiter = c->waiting_sync && !c->sync_have;
sock_close(fd);
reset_recv(c);
/* reset_recv keeps the body buffer alive across messages; on close
* we must free it ourselves before the slot is overwritten below. */
if(c->body) { xfree(c->body); c->body = NULL; c->body_cap = 0; }
if(c->sync_have) {
/* leftover response we never delivered */
if(c->sync_response && !E(c->sync_response)) _k(c->sync_response);
c->sync_have = 0;
c->sync_response = 0;
}
conns[idx] = conns[--conn_count];
/* Fire .m.c for every closed handle, regardless of whether we accepted
* it (server side) or opened it (client side). */
fire_close_handler(fd);
(void)was_waiter; /* sync caller will detect missing conn on next iter */
}
/* ---- listening / accepting ---- */
K ipc_listen(int port, int mode) {
#ifdef FUZZING
/* fuzz: pretend success without binding; \m / \m i N stay quiet */
(void)port; (void)mode; return 0;
#else
#ifdef _WIN32
if(mode == IPC_MODE_FORK) return kerror("fork: not supported on windows");
#endif
if(mode != IPC_MODE_ITER && mode != IPC_MODE_FORK) return KERR_DOMAIN;
/* Each mode owns an independent listener slot; rebinding or stopping
* (port==0) only affects the slot for the requested mode. */
int *slot_fd = (mode == IPC_MODE_FORK) ? &listen_fork_fd : &listen_iter_fd;
int *slot_port = (mode == IPC_MODE_FORK) ? &listen_fork_port : &listen_iter_port;
/* port==0 just stops this slot. */
if(port == 0) {
if(*slot_fd >= 0) { sock_close(*slot_fd); *slot_fd = -1; *slot_port = 0; }
return 0;
}
if(port < 0 || port > 65535) return KERR_DOMAIN;
/* Reject rebinding to the port this slot is already bound to — it's
* redundant, and silently re-creating a socket hides bugs. User must
* `\m i 0` / `\m f 0` first to explicitly stop, then rebind. */
if(*slot_fd >= 0 && *slot_port == port) return kerror("bind: Address already in use");
/* Do NOT close the old slot yet. We'll open the new socket and bind it
* first; only tear the old one down on success. That way a failed bind
* (e.g. the requested port is owned by the other slot, or by a process
* outside gk) leaves the original listener intact. */
sock_t fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == INVALID_SOCK) return cerror("socket", sock_lasterr());
int one = 1;
(void)setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
(const char*)&one, sizeof one);
struct sockaddr_in sa;
memset(&sa, 0, sizeof sa);
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = htonl(INADDR_ANY);
sa.sin_port = htons((unsigned short)port);
if(bind(fd, (struct sockaddr*)&sa, sizeof sa) < 0) {
K e = cerror("bind", sock_lasterr());
sock_close(fd);
return e;
}
if(listen(fd, 16) < 0) {
K e = cerror("listen", sock_lasterr());
sock_close(fd);
return e;
}
#ifndef _WIN32
/* In fork mode, install a real SIGCHLD handler that reaps exited
* children non-blockingly. We deliberately DON'T use SIG_IGN here:
* POSIX says with SIG_IGN, waitpid() blocks until ALL children exit
* and then fails with ECHILD, which would break `4: and `8: (the
* shell-out verbs in io.c that fork+waitpid for a specific pid).
* Idempotent: re-installing the same handler is a no-op. */
if(mode == IPC_MODE_FORK) {
struct sigaction sa;
memset(&sa, 0, sizeof sa);
sa.sa_handler = sigchld_reap;
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
sigemptyset(&sa.sa_mask);
sigaction(SIGCHLD, &sa, NULL);
}
#endif
/* New socket is bound+listening. Now safe to retire the old slot (if
* any) and publish the new one. */
if(*slot_fd >= 0) { sock_close(*slot_fd); *slot_fd = -1; *slot_port = 0; }
*slot_fd = (int)fd;
*slot_port = port;
return 0;
#endif
}
int ipc_extra_pollfds(struct pollfd *fds, int max) {
#ifdef FUZZING
/* fuzz: no listener, no conns -> nothing to poll */
(void)fds; (void)max; return 0;
#else
int n = 0;
if(listen_iter_fd >= 0 && n < max) {
fds[n].fd = listen_iter_fd; fds[n].events = POLLIN; fds[n].revents = 0; n++;
}
if(listen_fork_fd >= 0 && n < max) {
fds[n].fd = listen_fork_fd; fds[n].events = POLLIN; fds[n].revents = 0; n++;
}
for(int i = 0; i < conn_count && n < max; i++) {
/* Skip conns whose handler is currently running - this call is
* reached from inside a sync wait nested in that handler. Pulling
* more data off the conn now would re-enter dispatch_async on the
* same fd, and the inner handler's ipc_send_sync (if it does one)
* would collide with the outer pending sync handle via dedup and
* fail as "sync re-entry on same handle". Leaving the fd out of the
* poll set lets new messages queue in the kernel buffer; the outer
* loop picks them up after the handler returns. */
if(conns[i].in_dispatch) continue;
fds[n].fd = conns[i].fd; fds[n].events = POLLIN; fds[n].revents = 0; n++;
}
return n;
#endif
}
#ifndef _WIN32
/* Run a child server process: dispatch messages on its single accepted
* connection until the peer disconnects, then _exit. The child inherits
* all K state from the parent via copy-on-write, so .m.{s,g,c} and any
* user-defined globals are already in place; we just need to drain the
* one fd we kept and let the existing recv_step / deliver_message machine
* do its thing.
*
* Uses _exit (not exit) so atexit handlers don't interleave stdio with
* the parent. The shared stdout/stderr is intentional: it matches q's
* forking server behavior. */
static void ipc_child_loop(int cfd) {
for(;;) {
if(find_conn(cfd) < 0) break; /* close_conn_at fired .m.c */
struct pollfd pfd = { cfd, POLLIN, 0 };
int pr = sock_poll(&pfd, 1, -1);
if(pr < 0) {
if(sock_intr(sock_lasterr())) continue;
break;
}
if(pfd.revents) ipc_handle_pollfd(cfd, pfd.revents);
}
_exit(0);
}
#endif
static void handle_accept(int lfd, int lmode) {
struct sockaddr_in sa;
socklen_t sl = sizeof sa;
sock_t cfd = accept(lfd, (struct sockaddr*)&sa, &sl);
if(cfd == INVALID_SOCK) return; /* transient: would-block or kernel hiccup */
if(conn_count >= MAX_CONNS) { sock_close(cfd); return; }
#ifndef _WIN32
if(lmode == IPC_MODE_FORK) {
/* Flush both stdio streams BEFORE forking. Otherwise any data buffered
* in the parent's FILE* gets duplicated into the child's copy and emits
* twice the next time someone flushes. stdout is _IONBF in main.c so
* usually empty, but stderr is line-buffered by default and a partial
* line would re-emit. Cheap insurance. */
fflush(stdout);
fflush(stderr);
pid_t pid = fork();
if(pid < 0) { /* fork failed: drop the conn */
sock_close(cfd);
return;
}
if(pid > 0) { /* parent: hand off and resume */
sock_close(cfd);
return;
}
/* child: tear down inherited parent state we don't own. Close BOTH
* listener slots: the child serves a single accepted conn and has no
* business accepting new ones on either port. */
if(listen_iter_fd >= 0) { sock_close(listen_iter_fd); listen_iter_fd = -1; listen_iter_port = 0; }
if(listen_fork_fd >= 0) { sock_close(listen_fork_fd); listen_fork_fd = -1; listen_fork_port = 0; }
for(int i = 0; i < conn_count; i++) sock_close(conns[i].fd);
conn_count = 0;
/* drop any inherited timer schedule. We don't want a child
* staying alive past its work just because the parent had a
* recurring timer set. */
tmr_fork_clear();
/* Drop any other fds the parent had open (script files mid-\l, any
* io.c FILE* still live at fork time, etc). Keeps the new cfd. */
close_inherited_fds((int)cfd);
scope_refresh_pid(); /* .z.P -> child pid */
/* Reseed the xorshift RNG so siblings don't generate identical
* sequences. Mix in pid + wall-clock so two children forked in the
* same second still diverge (different pids). */
rand_reseed((unsigned long)getpid() ^ ((unsigned long)time(NULL) << 16));
/* register the one fd we DO own and dispatch on it. */
(void)sock_setnb(cfd);
sock_nodelay(cfd);
ipc_conn *c = &conns[conn_count++];
memset(c, 0, sizeof *c);
c->fd = (int)cfd;
c->is_client = 0;
ipc_child_loop((int)cfd); /* never returns */
}
#endif
/* Non-blocking so partial reads in recv_step return would-block cleanly. */
(void)sock_setnb(cfd);
sock_nodelay(cfd);
ipc_conn *c = &conns[conn_count++];
memset(c, 0, sizeof *c);
c->fd = (int)cfd;
c->is_client = 0;
}
/* ---- frame parsing ---- */
/* Validate a fully-received 8-byte header and populate body_need / msgtype.
* Returns 0 on ok, -1 on protocol error (caller closes the conn). */
static int parse_header(ipc_conn *c) {
u8 arch = c->hdr[0];
u8 msgtype = c->hdr[1];
u8 version = c->hdr[2];
u8 reserved = c->hdr[3];
u32 msglen;
memcpy(&msglen, c->hdr + 4, 4); /* little-endian on supported archs */
/* Any framing problem -> drop the connection. The caller (recv_step)
* closes the fd, which in turn signals any sync waiter via the existing
* close-with-waiter path; async senders just observe the closed peer. */
if(arch != IPC_ARCH_LE) return -1;
if(version != IPC_VERSION) return -1;
if(reserved != 0) return -1;
if(msgtype > MSG_SYNC_ERR) return -1;
if(msglen < IPC_HDR_SIZE) return -1;
if(msglen > IPC_MAX_MSG) return -1;
c->body_need = (i32)(msglen - IPC_HDR_SIZE);
c->msgtype = msgtype;
if(c->body_need > c->body_cap) {
c->body = xrealloc(c->body, (size_t)c->body_need);
c->body_cap = c->body_need;
}
return 0;
}
/* ---- send side ---- */
/* Write n bytes from buf to fd, retrying short writes and EINTR.
* For non-blocking fds (clients + accepted), poll for writability on
* EAGAIN. Returns 0 on success, -1 on error (caller should close). */
static int write_all(int fd, const void *buf, size_t n) {
const char *p = buf;
while(n > 0) {
ssize_t w = sock_send(fd, p, n);
if(w < 0) {
int e = sock_lasterr();
if(sock_intr(e)) continue;
if(sock_would_block(e)) {
struct pollfd pf = { fd, POLLOUT, 0 };
int pr = sock_poll(&pf, 1, -1);
if(pr < 0) {
if(sock_intr(sock_lasterr())) continue;
return -1;
}
if(pf.revents & (POLLERR|POLLHUP|POLLNVAL)) return -1;
continue;
}
return -1;
}
p += w; n -= (size_t)w;
}
return 0;
}
/* Frame and send a message. payload may be NULL iff plen==0.
* Returns 0 on success, -1 on error. */
static int send_frame(int fd, u8 msgtype, const void *payload, u32 plen) {
u8 hdr[IPC_HDR_SIZE];
u32 total = IPC_HDR_SIZE + plen;
hdr[0] = IPC_ARCH_LE;
hdr[1] = msgtype;
hdr[2] = IPC_VERSION;
hdr[3] = 0;
memcpy(hdr + 4, &total, 4);
if(write_all(fd, hdr, IPC_HDR_SIZE) < 0) return -1;
if(plen && write_all(fd, payload, plen) < 0) return -1;
return 0;
}
/* True iff a payload of plen bytes plus our header would exceed the cap.
* Use u64 arithmetic so the IPC_HDR_SIZE add can't itself wrap. */
static int over_ipclimit(u64 plen) {
return (plen + IPC_HDR_SIZE) > IPC_MAX_MSG;
}
/* Scratch buffer reused across all bd_-into-socket sends. Never freed;
* grows monotonically. Eliminates the per-call alloc + page-fault cost
* on the serialize output (the dominant overhead for large messages).
* Single-threaded: the IPC layer runs on the main REPL thread on POSIX,
* and forked children get their own copy via COW. */
static char *send_scratch;
static u64 send_scratch_cap;
/* Send the value `r` framed as msgtype. Consumes nothing. Returns 0/-1.
* On bd_ failure or oversize payload, falls back to sending a sync-err
* (the receiver will surface it as a kerror). */
static int send_value(int fd, u8 msgtype, K r) {
u64 len = 0;
K e = bd_into(r, &send_scratch, &send_scratch_cap, &len);
/* bd_ can legitimately fail; over_ipclimit fires when the serialized
* payload + 8-byte header would exceed our IPC frame cap. Both cases
* are surfaced as a bd_-wrapped char-vector SYNC_ERR so the client
* can uniformly db_ every payload regardless of msgtype. We rewrite
* 'wsfull -> 'length on the IPC send path so any over-2GB payload
* raises 'length. The recursive send_value is safe - it overwrites
* the scratch with the small error string and we no longer need
* the partial output. */
if(E(e) || over_ipclimit(len)) {
const char *m;
if(!E(e)) m = E[KERR_LENGTH];
else if(e == KERR_WSFULL) m = E[KERR_LENGTH];
else if(e < EMAX) m = E[e];
else m = "type";
K str = tn(3, (i32)strlen(m));
memcpy(px(str), m, strlen(m));
int rc = send_value(fd, MSG_SYNC_ERR, str);
_k(str);
return rc;
}
return send_frame(fd, msgtype, send_scratch, (u32)len);
}
/* Format an error K into a plain C string, wrap it as a K char vector,
* bd_-encode that, and send as MSG_SYNC_ERR. Receivers can therefore
* uniformly db_ every payload regardless of msgtype. */
static int send_error(int fd, K err) {
const char *m;
if(err < EMAX) m = E[err];
else m = sk(err);
size_t mlen = strlen(m);
K str = tn(3, (i32)mlen);
memcpy(px(str), m, mlen);
int rc = send_value(fd, MSG_SYNC_ERR, str);
_k(str);
return rc;
}
/* ---- dispatch ---- */
/* Look up .m.s / .m.g / .m.c, returning a fresh reference (or an error K). */
static K get_handler(const char *name) {
K nm = t(4, sp((char*)name));
return scope_get(gs, nm);
}
/* Apply handler `h` to single argument `arg`. Consumes both. fne() takes
* its arg as a *list of args*, so we wrap atomics/values in a 1-element
* list before calling.
*
* We honor the user's \e setting so that handler errors behave exactly
* like errors at the top-level REPL:
*
* \e 1 pgreduce_ prints the error and drops into a debug sub-REPL.
* IPC dispatch pauses for all peers until `\` exits the sub-REPL.
* \e 0 fne returns the error silently; we print it here to match the
* REPL's "print and continue" behavior (the REPL's own call in
* repl.c does the same kprint after pgreduce returns).
*
* For .m.g (sync) the caller also gets the error via SYNC_ERR, so the
* server-side print is in addition to the remote notification - same as
* how the REPL would report an error that simultaneously got caught by
* a trap handler. We still save+restore EFLAG so a handler that toggles
* \e itself can't persistently change it. */
static K apply1(K h, K arg) {
/* Guard against non-callable handlers. The user can put any K
* value in .m.{s,g} (`.m.g:"{1+x}"` is a real footgun: the default
* .m.g is `{. x}` which evals string *args*, but a string in the
* handler slot itself isn't callable). fne() on a non-callable
* crashes -- val() returns an error K for the same set, so we
* use that as a fast and uniform check. Note: callable-but-wrong-
* valence handlers (e.g. `.m.g:{x+y}`) project rather than crash;
* we let those through and the peer receives the projection. */
K vv = val(h);
if(E(vv)) {
_k(h);
_k(arg);
/* Match the post-fne print rule below: under \e 0, surface the
* problem on the server too. Under \e 1 we stay silent (no
* sub-repl: the offending dispatch was driven by an inbound
* message, not a top-level expression, and we still need to
* answer the peer with SYNC_ERR before unwinding). */
if(!EFLAG) {
K p = (vv < EMAX) ? kerror(E[vv]) : k_(vv);
kprint(p, "", "\n", "");
}
return vv;
}
_k(vv);
K args = tn(0, 1);
K *pa = px(args);
pa[0] = arg;
n(args) = 1;
int e0 = EFLAG;
K r = fne(h, args, "");
EFLAG = e0;
if(!e0 && E(r)) {
K p = (r < EMAX) ? kerror(E[r]) : r;
kprint(p, "", "\n", "");
if(p != r) _k(p);
}
return r;
}
/* Mark the fd's conn as currently running a handler. Nested sync waits
* exclude busy conns from their poll set so further data on the same
* conn queues in the kernel until the handler returns. */
static void mark_in_dispatch(int fd, int v) {
int ix = find_conn(fd);
if(ix >= 0) conns[ix].in_dispatch = (u8)(v ? 1 : 0);
}
static void dispatch_async(int fd, K msg) {
/* Async dispatch has no remote caller waiting on the result, so
* handler errors can't be surfaced to the peer. Under \e 0 they're
* silently dropped here; under \e 1 the user gets a debug sub-REPL
* inside apply1 and the eventual return value is dropped on the
* floor either way. */
mark_in_dispatch(fd, 1);
scope_set_z_w(fd);
K h = get_handler(".m.s");
if(E(h)) { scope_set_z_w(0); mark_in_dispatch(fd, 0); _k(msg); return; }
K r = apply1(h, msg);
scope_set_z_w(0);
mark_in_dispatch(fd, 0);
if(E(r)) { if(r >= EMAX) _k(r); }
else _k(r);
}
static void dispatch_sync(int fd, K msg) {
mark_in_dispatch(fd, 1);
scope_set_z_w(fd);
K h = get_handler(".m.g");
if(E(h)) {
scope_set_z_w(0);
mark_in_dispatch(fd, 0);
send_error(fd, h);
if(h >= EMAX) _k(h);
_k(msg);
return;
}
K r = apply1(h, msg);
scope_set_z_w(0);
mark_in_dispatch(fd, 0);
/* "abort" out of the handler means the user typed `\` to exit a
* debug sub-REPL that was opened (under \e 1) for an error inside
* the handler. The error itself was already printed and dismissed
* by the user; forwarding the literal "abort" string would land
* on the peer's top-level REPL where an abort-as-expression-value
* fires help(0) (see repl.c). Treat it as "discarded" and answer
* the peer with null instead. */
if(E(r) && r >= EMAX && sk(r) == sp("abort")) {
_k(r);
send_value(fd, MSG_SYNC_RSP, null);
return;
}
if(E(r)) {
send_error(fd, r);
if(r >= EMAX) _k(r);
}
else {
send_value(fd, MSG_SYNC_RSP, r);
_k(r);
}
}
static void fire_close_handler(int fd) {
mark_in_dispatch(fd, 1);
scope_set_z_w(fd);
K h = get_handler(".m.c");
if(E(h)) { scope_set_z_w(0); mark_in_dispatch(fd, 0); return; }
/* k semantics: .m.c is a string of K code that gets eval'd on close.
* Anything that isn't a non-empty char vector (a lambda, sym, int,
* list, the default "", ...) is silently ignored. The closed fd is
* not passed; if the user wants it, they can capture it elsewhere
* (e.g., maintain a global list inside .m.s). */
if(T(h) != -3 || n(h) == 0) { scope_set_z_w(0); mark_in_dispatch(fd, 0); _k(h); return; }
/* Evaluate by applying the canonical {. x} eval-lambda. Built fresh
* each time (only fires on disconnect, so the cost is irrelevant)
* so the user redefining .m.s can't affect us. */
K eval_lam = fnnew("{. x}");