Skip to content

Commit 70ae9ab

Browse files
committed
initial socket servers
1 parent ab7012c commit 70ae9ab

9 files changed

+384
-1
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
11
*.fasl
2+
.DS_Store
3+
hypergeometrica-worker
4+
hypergeometrica-manager

Makefile

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
.PHONY: all
2+
all: hypergeometrica-manager hypergeometrica-worker
3+
4+
# FIXME: this seems to depend on quicklisp being loaded, probably so
5+
# ASDF can know about the paths.
6+
hypergeometrica-manager:
7+
sbcl --non-interactive --eval '(asdf:make "hypergeometrica-manager")'
8+
mv src-manager/hypergeometrica-manager .
9+
10+
hypergeometrica-worker:
11+
sbcl --non-interactive --eval '(asdf:make "hypergeometrica-worker")'
12+
mv src-worker/hypergeometrica-worker .
13+
14+
.PHONY: clean
15+
clean:
16+
rm -f hypergeometrica-manager hypergeometrica-worker

hypergeometrica-manager.asd

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
;;;; hypergeometrica-manager.asd
2+
;;;;
3+
;;;; Copyright (c) 2024 Robert Smith
4+
5+
(asdf:defsystem #:hypergeometrica-manager
6+
:description "Manager process for running large Hypergeometrica calculations."
7+
:author "Robert Smith <[email protected]>"
8+
:license "BSD 3-clause (See LICENSE.txt)"
9+
:depends-on (#:clingon #:uiop #:bordeaux-threads #:sb-bsd-sockets)
10+
; :in-order-to ((asdf:test-op (asdf:test-op #:hypergeometrica-manager/tests)))
11+
:around-compile (lambda (compile)
12+
(let (#+sbcl (sb-ext:*derive-function-types* t))
13+
(funcall compile)))
14+
:pathname "src-manager/"
15+
:serial t
16+
:components ((:file "package")
17+
(:file "main"))
18+
:build-operation "program-op"
19+
:build-pathname "hypergeometrica-manager"
20+
:entry-point "hypergeometrica-manager:main")
21+
22+
(asdf:defsystem #:hypergeometrica-manager/tests
23+
:description "Tests for HYPERGEOMETRICA-MANAGER."
24+
:author "Robert Smith <[email protected]>"
25+
:license "BSD 3-clause (See LICENSE.txt)"
26+
:defsystem-depends-on (#:uiop)
27+
:depends-on (#:hypergeometrica-manager
28+
#:fiasco)
29+
:perform (asdf:test-op (o s)
30+
#+ignore
31+
(uiop:symbol-call '#:hypergeometrica-tests
32+
'#:test-hypergeometrica))
33+
:serial t
34+
:components ())

hypergeometrica-worker.asd

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
;;;; hypergeometrica-worker.asd
2+
;;;;
3+
;;;; Copyright (c) 2024 Robert Smith
4+
5+
(asdf:defsystem #:hypergeometrica-worker
6+
:description "Worker process for running large Hypergeometrica calculations."
7+
:author "Robert Smith <[email protected]>"
8+
:license "BSD 3-clause (See LICENSE.txt)"
9+
:depends-on (#:clingon #:uiop #:bordeaux-threads #:sb-bsd-sockets)
10+
; :in-order-to ((asdf:test-op (asdf:test-op #:hypergeometrica-worker/tests)))
11+
:around-compile (lambda (compile)
12+
(let (#+sbcl (sb-ext:*derive-function-types* t))
13+
(funcall compile)))
14+
:pathname "src-worker/"
15+
:serial t
16+
:components ((:file "package")
17+
(:file "main"))
18+
:build-operation "program-op"
19+
:build-pathname "hypergeometrica-worker"
20+
:entry-point "hypergeometrica-worker:main")
21+
22+
(asdf:defsystem #:hypergeometrica-worker/tests
23+
:description "Tests for HYPERGEOMETRICA-WORKER."
24+
:author "Robert Smith <[email protected]>"
25+
:license "BSD 3-clause (See LICENSE.txt)"
26+
:defsystem-depends-on (#:uiop)
27+
:depends-on (#:hypergeometrica-worker
28+
#:fiasco)
29+
:perform (asdf:test-op (o s)
30+
#+ignore
31+
(uiop:symbol-call '#:hypergeometrica-tests
32+
'#:test-hypergeometrica))
33+
:serial t
34+
:components ())

hypergeometrica.asd

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
;;;; hypergeometrica.asd
22
;;;;
3-
;;;; Copyright (c) 2019-2023 Robert Smith
3+
;;;; Copyright (c) 2019-2024 Robert Smith
44

55
(asdf:defsystem #:hypergeometrica
66
:description "Calculate lots of digits of things."

src-manager/main.lisp

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
;;;; Copyright (c) 2024 Robert Smith
2+
3+
(in-package #:hypergeometrica-manager)
4+
5+
(sb-ext:defglobal **socket** nil)
6+
(sb-ext:defglobal **socket-node** nil)
7+
(sb-ext:defglobal **socket-thread** nil)
8+
9+
;;; Socket
10+
11+
(defun write-form (stream form)
12+
(prin1 form stream)
13+
(terpri stream)
14+
(finish-output stream))
15+
16+
(defun make-socket-listener ()
17+
(let* ((server **socket**))
18+
(lambda ()
19+
(unwind-protect
20+
(loop
21+
(let* ((client (sb-bsd-sockets:socket-accept server))
22+
(stream (sb-bsd-sockets:socket-make-stream
23+
client
24+
:input t
25+
:output t
26+
:element-type 'character
27+
:buffering :line
28+
:timeout 5))
29+
(message (read stream nil '(:eof))))
30+
(handle-worker-message message stream)
31+
(sb-bsd-sockets:socket-close client)))
32+
(sb-bsd-sockets:socket-close **socket**)
33+
(delete-file **socket-node**)
34+
(setf **socket** nil
35+
**socket-node** nil
36+
**socket-thread** nil)))))
37+
38+
(defun start-socket-thread ()
39+
(when **socket-thread**
40+
(warn "Socket thread already started.")
41+
(bt:destroy-thread **socket-thread**))
42+
(setf **socket-thread** (bt:make-thread
43+
(make-socket-listener)
44+
:name "Hypergeometrica Socket Server")))
45+
46+
;;; Worker Tracking
47+
48+
(defclass worker-status ()
49+
((id :accessor worker-status-id
50+
:initarg :id)
51+
(last-heartbeat :accessor last-heartbeat
52+
:initarg :last-heartbeat)))
53+
54+
(sb-ext:defglobal **max-workers** 1)
55+
(sb-ext:defglobal **workers-lock** (bt:make-lock "**workers**"))
56+
(sb-ext:defglobal **workers** nil)
57+
58+
(defun make-id ()
59+
(sleep 1.5)
60+
(get-universal-time))
61+
62+
(defun check-worker (id)
63+
(bt:with-lock-held (**workers-lock**)
64+
(let ((status (find id **workers** :key #'worker-status-id)))
65+
(cond
66+
(status
67+
(setf (last-heartbeat status) (get-internal-real-time))
68+
id)
69+
(t
70+
(warn "Unknown worker identified as #~D" id)
71+
nil)))))
72+
73+
(defun make-heartbeat-checker (&optional (timeout 10))
74+
(lambda ()
75+
(loop
76+
(sleep timeout)
77+
(bt:with-lock-held (**workers-lock**)
78+
(loop :for status :in **workers**
79+
:if (< timeout (/ (- (get-internal-real-time)
80+
(last-heartbeat status))
81+
internal-time-units-per-second))
82+
:collect status :into evict
83+
:else
84+
:collect status :into renew
85+
:finally (progn
86+
(setf **workers** renew)
87+
(dolist (status evict)
88+
(warn "Evicting ~A due to timeout." (worker-status-id status)))))))))
89+
90+
(sb-ext:defglobal **heartbeat-checker-thread** nil)
91+
(defun start-heartbeat-checker-thread ()
92+
(setf **heartbeat-checker-thread**
93+
(bt:make-thread (make-heartbeat-checker) :name "Heartbeat Checker")))
94+
95+
;;; Worker Message Handling
96+
97+
(defun handle-unknown-message (message)
98+
(warn "Unknown message received: ~A" (prin1-to-string message))
99+
nil)
100+
101+
(defun handle-worker-message (message stream)
102+
(typecase message
103+
(atom
104+
(handle-unknown-message message))
105+
((cons keyword)
106+
(alexandria:destructuring-case message
107+
((:eof)
108+
(warn "Received EOF from client."))
109+
((:join)
110+
(bt:with-lock-held (**workers-lock**)
111+
(cond
112+
((> **max-workers** (length **workers**))
113+
(let ((new-id (make-id)))
114+
(push
115+
(make-instance 'worker-status
116+
:id new-id
117+
:last-heartbeat (get-internal-real-time))
118+
**workers**)
119+
(write-form stream `(:welcome :id ,new-id))))
120+
(t
121+
(write-form stream '(:no-vacancy))))))
122+
((:status)
123+
nil)
124+
((t &rest rest)
125+
(declare (ignore rest))
126+
(handle-unknown-message message))))
127+
(t
128+
(let ((from (car message)))
129+
(when (check-worker from)
130+
(alexandria:destructuring-case (cdr message)
131+
((:ping)
132+
(format t "Ping from client ~D~%" from)
133+
(write-form stream '(:pong))
134+
(finish-output stream))
135+
((:heartbeat)
136+
(format t "Heartbeat from worker #~D~%" from))
137+
((t &rest rest)
138+
(declare (ignore rest))
139+
(handle-unknown-message message))))))))
140+
141+
;;; CLI
142+
143+
(defun cli-options ()
144+
(list
145+
(clingon:make-option
146+
:integer
147+
:required t
148+
:description "maximum number of workers"
149+
:long-name "max-workers"
150+
:key :max-workers)))
151+
152+
(defun cli-command ()
153+
(clingon:make-command
154+
:name "hypergeometrica-manager"
155+
:options (cli-options)
156+
:handler #'cli-handler))
157+
158+
(defun cli-handler (cmd)
159+
(let ((pid (sb-posix:getpid)))
160+
(setf **max-workers** (clingon:getopt cmd ':max-workers)
161+
**socket** (make-instance 'sb-bsd-sockets:local-socket
162+
:type :stream)
163+
**socket-node** (merge-pathnames
164+
(format nil "manager-~D" pid)
165+
"/tmp/"))
166+
(sb-bsd-sockets:socket-bind **socket** (namestring **socket-node**))
167+
(sb-bsd-sockets:socket-listen **socket** 8)
168+
(start-heartbeat-checker-thread)
169+
(start-socket-thread)
170+
(format t "Started socket on: ~A~%" **socket-node**)
171+
(format t "Waiting for socket thread to end.~%")
172+
(finish-output)
173+
(bt:join-thread **socket-thread**)))
174+
175+
(defun main ()
176+
(sb-ext:disable-debugger)
177+
(let ((app (cli-command)))
178+
(clingon:run app)))

src-manager/package.lisp

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
;;;; Copyright (c) 2024 Robert Smith
2+
3+
(defpackage #:hypergeometrica-manager
4+
(:use #:cl)
5+
(:export #:main))

src-worker/main.lisp

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
;;;; Copyright (c) 2024 Robert Smith
2+
3+
(in-package #:hypergeometrica-worker)
4+
5+
(sb-ext:defglobal **id** nil)
6+
(sb-ext:defglobal **manager-address** nil)
7+
(sb-ext:defglobal **manager-lock** (bt:make-lock))
8+
9+
(defun cli-options ()
10+
(list
11+
(clingon:make-option
12+
:string
13+
:description "Address of open socket."
14+
:short-name #\s
15+
:long-name "socket-address"
16+
:key :socket-address)
17+
))
18+
19+
(defun cli-command ()
20+
(clingon:make-command
21+
:name "hypergeometrica-worker"
22+
:options (cli-options)
23+
:handler #'cli-handler))
24+
25+
(defmacro with-manager-io ((stream) &body body)
26+
(alexandria:with-gensyms (manager)
27+
`(bt:with-lock-held (**manager-lock**)
28+
(let ((,manager (make-instance 'sb-bsd-sockets:local-socket
29+
:type :stream)))
30+
(unwind-protect
31+
(progn
32+
(sb-bsd-sockets:socket-connect ,manager **manager-address**)
33+
(let ((,stream (sb-bsd-sockets:socket-make-stream
34+
,manager
35+
:element-type 'character
36+
:input t
37+
:output t
38+
:buffering ':line)))
39+
,@body))
40+
(when (sb-bsd-sockets:socket-open-p ,manager)
41+
(sb-bsd-sockets:socket-close ,manager)))))))
42+
43+
(defun write-form (stream form)
44+
(prin1 form stream)
45+
(terpri stream)
46+
(finish-output stream))
47+
48+
(defun read-form (stream)
49+
(read stream nil '(:eof)))
50+
51+
;;; Heartbeat
52+
53+
(defun make-heartbeat (id &optional (period 5))
54+
(let ((heartbeat `(,id :heartbeat)))
55+
(lambda ()
56+
(loop
57+
(with-manager-io (stream)
58+
(write-form stream heartbeat)
59+
(finish-output stream))
60+
(sleep period)))))
61+
62+
(sb-ext:defglobal **heartbeat-thread** nil)
63+
(defun start-heartbeat-thread ()
64+
(setf **heartbeat-thread** (bt:make-thread (make-heartbeat **id**)
65+
:name "Heartbeat Thread")))
66+
67+
;;; Request ID
68+
69+
(defun request-id (stream)
70+
(write-form stream '(:join))
71+
(alexandria:destructuring-ecase (read-form stream)
72+
((:welcome &key id)
73+
(format t "Got an ID: ~A~%" id)
74+
id)
75+
((:no-vacancy)
76+
(format t "No vacancy.~%")
77+
nil)))
78+
79+
;;; Main
80+
81+
(defun cli-handler (cmd)
82+
(let ((manager-address (clingon:getopt cmd ':socket-address)))
83+
;; Set the manager address.
84+
(unless (probe-file manager-address)
85+
(error "Manager address ~A not found." manager-address))
86+
(setf **manager-address** manager-address)
87+
88+
;; Get an ID.
89+
(with-manager-io (stream)
90+
(format t "Requesting ID.~%")
91+
(let ((id? (request-id stream)))
92+
(cond
93+
(id?
94+
(setf **id** id?))
95+
(t
96+
(uiop:quit 1)))))
97+
98+
(start-heartbeat-thread)
99+
(with-manager-io (stream)
100+
(format t "#~D connected to socket.~%" **id**)
101+
(write-form stream `(,**id** :ping))
102+
(format t "#~D: received ~S~%" **id** (read-form stream))
103+
(finish-output))
104+
(bt:join-thread **heartbeat-thread**)))
105+
106+
(defun main ()
107+
(let ((app (cli-command)))
108+
(clingon:run app)))

src-worker/package.lisp

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
;;;; Copyright (c) 2024 Robert Smith
2+
3+
(defpackage #:hypergeometrica-worker
4+
(:use #:cl)
5+
(:export #:main))

0 commit comments

Comments
 (0)