|
4 | 4 | [babashka.fs :as fs]
|
5 | 5 | [cheshire.core :as json]
|
6 | 6 | [clojure.core.async :as async]
|
7 |
| - [clojure.java.io :as io] |
8 | 7 | [clojure.pprint :refer [pprint]]
|
9 | 8 | [clojure.string :as string]
|
10 | 9 | [creds]
|
|
109 | 108 | ;; Tty wraps the process in a pseudo terminal
|
110 | 109 | {:StdinOnce true
|
111 | 110 | :OpenStdin true}
|
112 |
| -(defn create-container [{:keys [image entrypoint workdir command host-dir env thread-id opts mounts volumes] :or {opts {:Tty true}} :as m}] |
| 111 | +(defn create-container [{:keys [image entrypoint workdir command host-dir env thread-id opts mounts volumes] :or {opts {:Tty true}}}] |
113 | 112 | #_(jsonrpc/notify :message {:content (str m)})
|
114 | 113 | (let [payload (json/generate-string
|
115 | 114 | (merge
|
|
167 | 166 | {:raw-args ["--unix-socket" "/var/run/docker.sock"]
|
168 | 167 | :throw false}))
|
169 | 168 |
|
| 169 | +;; container was created with a Terminal (output is not-multiplexed) |
| 170 | +;; use after the container has stopped (not streaming) |
170 | 171 | (defn attach-container [{:keys [Id]}]
|
171 | 172 | ;; logs is true (as opposed to stream=true) so we run this after the container has stopped
|
172 | 173 | ;; TTY is true above so this is the just the raw data sent to the PTY (not multiplexed)
|
|
175 | 176 | {:raw-args ["--unix-socket" "/var/run/docker.sock"]
|
176 | 177 | :throw false}))
|
177 | 178 |
|
| 179 | +;; container was created without a Terminal (output is multiplexed so read as bytes) |
178 | 180 | (defn attach-container-stdout-logs [{:keys [Id]}]
|
179 | 181 | ;; this assumes no Tty so the output will be multiplexed back
|
180 | 182 | (curl/post
|
|
183 | 185 | :as :bytes
|
184 | 186 | :throw false}))
|
185 | 187 |
|
| 188 | +;; container must be created with a Terminal (so we can stream with a Reader) |
186 | 189 | (defn attach-container-stream-stdout [{:keys [Id]}]
|
187 |
| - ;; this assumes no Tty so the output will be multiplexed back |
188 | 190 | (curl/post
|
189 | 191 | (format "http://localhost/containers/%s/attach?stderr=false&stdout=true&stream=true" Id)
|
190 | 192 | {:raw-args ["--unix-socket" "/var/run/docker.sock"]
|
|
256 | 258 | [m cb]
|
257 | 259 | (when (not (has-image? (:image m)))
|
258 | 260 | (-pull m))
|
259 |
| - (let [x (create m) |
| 261 | + (let [x (-> m |
| 262 | + (update :opts |
| 263 | + (fnil merge {}) |
| 264 | + {:Tty true |
| 265 | + :StdinOnce false |
| 266 | + :OpenStdin false |
| 267 | + :AttachStdin false}) |
| 268 | + (create)) |
260 | 269 | finished-channel (async/promise-chan)]
|
261 | 270 | (start x)
|
262 | 271 |
|
263 | 272 | (async/go
|
264 | 273 | (try
|
265 | 274 | (let [s (:body (attach-container-stream-stdout x))]
|
266 |
| - (println s) |
267 | 275 | (doseq [line (line-seq (java.io.BufferedReader. (java.io.InputStreamReader. s)))]
|
268 | 276 | (cb line)))
|
269 | 277 | (catch Throwable e
|
270 | 278 | (println e))))
|
271 | 279 |
|
272 |
| -;; watch the container |
| 280 | + ;; watch the container |
273 | 281 | (async/go
|
274 | 282 | (wait x)
|
275 | 283 | (async/>! finished-channel {:done :exited}))
|
276 | 284 |
|
277 |
| -;; body is raw PTY output |
278 |
| - (let [finish-reason (async/<!! finished-channel) |
279 |
| - s (:body (attach x)) |
280 |
| - info (inspect x)] |
281 |
| - (delete x) |
282 |
| - (merge |
283 |
| - finish-reason |
284 |
| - {:pty-output s |
285 |
| - :exit-code (-> info :State :ExitCode) |
286 |
| - :info info})))) |
| 285 | + {:container x |
| 286 | + ;; stopped channel |
| 287 | + :stopped (async/go |
| 288 | + ;; body is raw PTY output |
| 289 | + (let [finish-reason (async/<!! finished-channel) |
| 290 | + s (:body (attach x)) |
| 291 | + info (inspect x)] |
| 292 | + (delete x) |
| 293 | + (merge |
| 294 | + finish-reason |
| 295 | + {:pty-output s |
| 296 | + :exit-code (-> info :State :ExitCode) |
| 297 | + :info info})))})) |
287 | 298 |
|
288 | 299 | (comment
|
289 | 300 | (async/thread
|
290 | 301 | (run-streaming-function-with-no-stdin
|
291 |
| - {:image "vonwig/inotifywait:latest" |
292 |
| - :volumes ["docker-prompts:/prompts"] |
293 |
| - :command ["-e" "create" "-e" "modify" "-e" "delete" "-q" "-m" "/prompts"] |
294 |
| - :opts {:Tty true |
295 |
| - :StdinOnce false |
296 |
| - :OpenStdin false |
297 |
| - :AttachStdin false}} |
298 |
| - println))) |
| 302 | + {:image "vonwig/inotifywait:latest" |
| 303 | + :volumes ["docker-prompts:/prompts"] |
| 304 | + :command ["-e" "create" "-e" "modify" "-e" "delete" "-q" "-m" "/prompts"] |
| 305 | + :opts {:Tty true |
| 306 | + :StdinOnce false |
| 307 | + :OpenStdin false |
| 308 | + :AttachStdin false}} |
| 309 | + println))) |
299 | 310 |
|
300 | 311 | (defn run-function
|
301 | 312 | "run container function with no stdin, and no streaming output"
|
|
327 | 338 | :exit-code (-> info :State :ExitCode)
|
328 | 339 | :info info}))))
|
329 | 340 |
|
| 341 | +;; not curl - opens a real socket to write to the container stdin |
| 342 | +;; returns the open socket - closing the socket will signal the program to finish |
| 343 | +;; if it's waiting on input |
330 | 344 | (defn write-stdin [container-id content]
|
331 | 345 | (let [buf (ByteBuffer/allocate (* 1024 20))
|
332 | 346 | address (UnixDomainSocketAddress/of "/var/run/docker.sock")
|
|
362 | 376 | (println t)
|
363 | 377 | "")))
|
364 | 378 |
|
365 |
| -(defn function-call-with-stdin [m] |
| 379 | +(defn function-call-with-stdin |
| 380 | + "creates and starts container, then writes to stdin process |
| 381 | + returns container map with Id, and socket - socket is open socket to stdin" |
| 382 | + [m] |
366 | 383 | (when (not (has-image? (:image m)))
|
367 | 384 | (-pull m))
|
368 | 385 | (let [x (merge
|
|
390 | 407 | (async/go
|
391 | 408 | (wait x)
|
392 | 409 | (async/>! finished-channel {:done :exited}))
|
393 |
| - ;; body is raw PTY output |
| 410 | + ;; body is raw PTY output - could we |
| 411 | + ;; have just been reading from the socket since it's two way? |
394 | 412 | (try
|
395 | 413 | (let [finish-reason (async/<!! finished-channel)
|
396 | 414 | s (docker-stream-format->stdout
|
|
0 commit comments