Skip to content

Latest commit

 

History

History
1379 lines (1118 loc) · 53.5 KB

architecture.org

File metadata and controls

1379 lines (1118 loc) · 53.5 KB

Help me think of a good architecture for a project similar to pipecat but written in the clojure ecosystem.

A Clojure framework for building real-time voice-enabled AI applications. voice-fn handles the orchestration of speech recognition, audio processing, and AI service integration with the elegance of functional programming.

## Features

  • Real-time speech recognition with configurable backends
  • Clean functional interface for audio stream processing
  • Seamless integration with popular AI models and services
  • Built-in support for conversation state management
  • Efficient audio processing with core.async channels
  • Hot-reloadable pipeline components
  • Extensible architecture with composable transforms

My problems:

  • I tried to replicate the same system from pipecat but it doens’t map well over clojure way of doing things since it puts things in a very OOP manner.
  • I want to use core.async and I was thinking that all processors have an in and a out. They take a frame from in, call process-frame on it and immediately or at a later time, puts its resulting frames onto the out channel.
  • The way pipecat did it with classes for processors and transports works well but it doesn’t feel very clojure with data orientation.

    I welcome any ideas you have. Here is some of my ideas in Clojure so far (I am not very happy with them since I tried to port the OOP model to protocols and records and doesn’t hold so well since you need maker functions to simulate async work done in a constructor):

    (ns voice-fn.processors.protocols)

(defprotocol FrameProcessor “A frame processor is one of the pieces from the pipeline. Examples:

  • `TranscriptionProcessor`: processes `RawAudioInputFrames` and outputs `TranscriptionOutputFrames`
  • `TextLLMProcessor`: processes `TranscriptionOutputFrames` and outputs `LLMOutputTokenFrames`”

(process-frame [this frame] “Process a frame of a given type. May output 0, 1 or multiple frames in the pipeline”))

(defprotocol ProcessorLifecycle (start! [this] “Start the processor. Optionally push a start frame further in the pipeline”) (stop! [this] “Stop the processor. Optionally push a stop frame further in the pipeline.”))

(ns voice-fn.processors.deepgram (:require [hato.websocket :as ws] [taoensso.telemere :as t] [voice-fn.frames :as frames] [voice-fn.processors.protocols :refer [FrameProcessor ProcessorLifecycle stop!]] [voice-fn.utils.core :as u]) (:import (java.nio HeapCharBuffer)))

(def ^:private deepgram-url “wss://api.deepgram.com/v1/listen”)

(def deepgram-config {:encoding “linear16” :language “ro” :sample_rate 16000 :interim_results false :punctuate true :model “nova-2”})

(def deepgram-url-encoding (u/append-search-params deepgram-url deepgram-config))

(defn transcript? [m] (= (:event m) “transcript”))

(defn- transcript [m] (-> m :channel :alternatives first :transcript))

(defn final-transcription? [m] (and (transcript? m) (= (:type m) “final”)))

(defn- make-websocket-config [api-key] {:headers {“Authorization” (str “Token ” api-key)} :on-open (fn [_] (t/log! :info “Deepgram websocket connection open”)) :on-message (fn [_ws ^HeapCharBuffer data _last?] (let [m (u/parse-if-json (str data)) trsc (transcript m)] (when (and (final-transcription? m) (not= trsc “”)) (t/log! :info [“Transcript” trsc])))) :on-error (fn [_ e] (t/log! :error [“Error” e])) :on-close (fn [_ code reason] (t/log! :info [“Deepgram websocket connection closed” “Code:” code “Reason” reason]))})

(defn close-connection-payload [] (u/json-str {:type “CloseStream”}))

(defrecord DeepgramTranscriptionProcessor [conn] FrameProcessor (process-frame [this frame] (when (frames/raw-audio-input-frame? frame) (ws/send! (:raw frame) false))) ProcessorLifecycle (start! [this] (t/log! :info “Deepgram transcription started”)) (stop! [this] (t/log! :info “Closing Deepgram connection”) (ws/send! conn (close-connection-payload)) (ws/close! conn)))

(defn make-deepgram-transcription-processor [api-key] (let [conn (ws/websocket deepgram-url-encoding (make-websocket-config api-key))] (->DeepgramTranscriptionProcessor conn)))

(comment

(def processor (make-deepgram-transcription-processor api-key)) (stop! processor))

Let me guide you towards a more Clojure-centric design. Let’s start with some high-level hints:

  1. Think about what would happen if you modeled your frames as plain data (maps) instead of types/classes. What benefits might this give you?
  2. Consider how core.async’s transducers could relate to your frame processing pipeline. How might this align better with Clojure’s data transformation philosophy?
  3. Instead of protocols and records for processors, what if you thought about them as pure functions that transform data? What would that look like?

Would you like to explore any of these directions further? Which aspect interests you most?

Let’s explore the model first

Sure! Let’s think about modeling frames as data. Here’s a hint to get you started:

In Clojure, we often use maps with a :type key to distinguish between different kinds of data. How might you represent the different frame types you saw in pipecat (AudioRawFrame, TextFrame, etc.) using just maps?

Try sketching out a few example frames as pure data structures. Don’t worry about the processing yet - let’s nail down the data model first.

The initial dummy way to do it is have a map like this:

{:stt {:provider :elevenlabs :config {…}} :llm {:provider :llama :config} … }

However this model is too static. I want users to be able to put on the pipeline whatever they want. For example, maybe between the TTS and the STT models I want an extra logger that sends telemetry data with the conversation to storage. The correct model is an array with the processors specified in order

Good thinking! You’re moving in the right direction with the array of processors.

Here’s a hint to push this further: In Clojure, we often represent “configuration as data” where the configuration itself can describe both structure and behavior.

Think about how you might represent a pipeline like this:

[{:type :stt
  :provider :deepgram
  ...}
 {:type :telemetry
  :store-fn ...}
 {:type :llm
  :provider :openai
  ...}]

What if each processor spec in this vector could also include information about:

  • What kinds of frames it can accept
  • What kinds of frames it produces
  • Any side effects it might have

How might that help with pipeline validation and composition?

I don’t think we need to specify the type as that doesn’t serve any need. Also the pipeline should maintain state. For example, deepgram transcription processor maintains a websocket connection

Ah, now we’re getting to an interesting design question! Let me offer a hint:

In Clojure, we often separate the configuration/specification of something from its runtime instance. Consider this pattern:

;; The specification (pure data)
{:provider :deepgram
 :api-key "..."
 :config {...}}

;; The runtime instance (stateful)
{:chan-in (chan)
 :chan-out (chan)
 :conn (atom nil)  ; websocket connection
 :state (atom {})} ; other state

Think about:

  1. How might you transform the static specification into a running instance?
  2. Where would the state management code live if not in records/protocols?
  3. How could core.async channels fit into this picture?

Would you like to explore any of these aspects?

Let’s explore the data oriented specification for a simple pipeline:

  1. Capture local audio from microphone
  2. Send those to a transcription engine (deepgram)
  3. Log all of the transcriptions while they come

Good example to start with! Let’s think about how to specify this pipeline in a data-oriented way.

Here’s a hint: Consider what minimal information you need to specify each step, keeping in mind that we want to separate the “what” from the “how”.

Try writing out a data structure that describes this pipeline. Think about:

  • How to specify the audio source configuration
  • How to connect the components
  • What configuration each component needs
  • How to specify where the logs should go

Want to take a first pass at this specification?

Yes

Okay, let’s start sketching. Here’s a hint for the structure:

Think about each component in terms of:

  1. What it needs to initialize
  2. What resources it needs access to
  3. What configuration parameters it requires

Try writing a data structure that describes your pipeline. You might start with something like:

[{:source :microphone
  ...}
 {:transcribe :deepgram
  ...}
 {:log ...}]

Fill in what you think each component needs. Don’t worry about getting it perfect - we can refine it together.

No, it should be something like this:

(defn- make-websocket-config [api-key out-chan]
  {:headers {"Authorization" (str "Token " api-key)}
   :on-open (fn [_]
              (t/log! :info "Deepgram websocket connection open"))
   :on-message (fn [_ws ^HeapCharBuffer data _last?]
                 (let [m (u/parse-if-json (str data))
                       trsc (transcript m)]
                   (when (and (final-transcription? m)
                              (not= trsc ""))
                     (a/go (a/>! out-chan (frames/text-input-frame)))))
   :on-error (fn [_ e]
               (t/log! :error ["Error" e]))
   :on-close (fn [_ code reason]
               (t/log! :info ["Deepgram websocket connection closed" "Code:" code "Reason" reason]))})

(defn create-deepgram-transcription-websocket [state config]
  (ws/websocket deepgram-url-encoding (make-websocket-config (:api-key config) (:main-ch @state))))

(defn get-connection! [state config]
  (if-let [conn (get-in [:transcription/deepgram :conn] @state)]
    conn
    (swap! assoc-in [:transcription/deepgram :conn] (create-deepgram-transcription-websocket state config))))

[{:accepted-frames #{:system/start :system/stop :system/config} ;; doesn't need to interrupt on user speech
  :generated-frames #{:audio/raw-input}
  :process-frame-fn (fn [state config frame]
                      (let [running? (atom false)]
                        (condp type frame
                          :system/start (start-capture-audio (:audio-config @state))
                          :system/stop (stop-capture-audio)
                          )))
  ;; meta is used to access processor specific state
  :type :transport/local-audio
  :direction :input}
 {:accepted-frames #{:system/start :system/stop :system/config :audio/raw-input} ;; also doesn't need to interrupt
  :generated-frames #{:text/input}
  :api-key "..."
  :process-frame-fn
  (fn [state config frame]
    (condp type frame
      :system/start (noop)
      :system/stop (let [conn (get-in [:transcription/deepgram :conn] @state)]
                     (close-connection! conn))
      ;; deepgram websocket times out if we don't send data fast, so we create
      ;; the connection only when we have some actual data to process
      :audio/raw-input (let [conn (get-connection! state config)]
                         (ws/send! conn (:raw frame)))

      )
    )
  :type :transcription/deepgram
  }
 {:accepted-frames #{:system/start :system/stop :system/config :text/input} ;; also doesn't need to interrupt
  :generated-frames #{}
  :process-frame-fn
  (fn [state config frame]
    (condp type frame
      :system/start (noop)
      :system/stop (noop)
      :text/input (t/log! :info (:text frame))

      )
    )
  :type :log/text-input
  }
 ]

Ah, now I see where you’re going! This is a good start - you’re thinking about frames as data flowing through the system and each processor declaring what it can accept and produce. Let me help refine this model.

A few suggestions to make this more idiomatic Clojure:

  1. Consider separating the static configuration from the processing logic:
(def processors
  {:transport/local-audio
   {:accepted-frames #{:system/start :system/stop :system/config}
    :generated-frames #{:audio/raw-input}
    :direction :input}

   :transcription/deepgram
   {:accepted-frames #{:system/start :system/stop :system/config :audio/raw-input}
    :generated-frames #{:text/input}}

   :log/text-input
   {:accepted-frames #{:system/start :system/stop :system/config :text/input}
    :generated-frames #{}}})

(defmulti process-frame
  (fn [processor-type state config frame]
    processor-type))

(defmethod process-frame :transport/local-audio
  [_ state config frame]
  (case (:type frame)
    :system/start (start-capture-audio (:audio-config @state))
    :system/stop (stop-capture-audio)
    ;; ... etc
    ))
  1. For pipeline construction, you could have:
(defn make-pipeline [processors-config]
  (let [state (atom {})
        channels (create-channels processors-config)]
    (doseq [processor processors-config]
      (start-processor! processor state channels))
    {:state state
     :channels channels}))

;; Usage:
(def pipeline-config
  [{:type :transport/local-audio
    :config {...}}
   {:type :transcription/deepgram
    :config {:api-key "..."}}
   {:type :log/text-input}])
  1. For handling the channels:
(defn start-processor! [{:keys [type config]} state channels]
  (let [in-chan (get-in channels [type :in])
        out-chan (get-in channels [type :out])]
    (go-loop []
      (when-let [frame (<! in-chan)]
        (when-let [result (process-frame type state config frame)]
          (>! out-chan result))
        (recur)))))

What do you think about these refinements? Would you like to explore any particular aspect further?

It’s interesting for sure. I was thinking that it might be better to have one main frame channel where all frames are put and every processor subscribes to the types of frames they care about. Your example in start-processor!, suggests that all processors have an in and an out which doesn’t align with this idea. What do you think? We can use mult and tap

Ah, that’s a great observation! Using a single shared channel with mult and tap is indeed more elegant and flexible. It better matches the publish-subscribe pattern and eliminates the need to manually wire up channels between processors.

Here’s how we might reshape this:

(defn create-pipeline [processors-config]
  (let [main-ch (chan 1024)
        main-mult (mult main-ch)
        state (atom {:main-ch main-ch
                    :main-mult main-mult})]

    ;; Start each processor
    (doseq [{:keys [type accepted-frames] :as processor} processors-config]
      (let [processor-ch (chan 1024)
            ;; Tap into main channel, filtering for accepted frame types
            _ (tap main-mult processor-ch (comp accepted-frames :type))]

        ;; Start processor loop
        (go-loop []
          (when-let [frame (<! processor-ch)]
            (when-let [result (process-frame type state processor frame)]
              ;; Put results back on main channel
              (>! main-ch result))
            (recur)))))

    state))

This approach has several benefits:

  1. Processors don’t need to know about each other
  2. New processors can be added dynamically by just tapping into the main channel
  3. Frame routing is handled declaratively via the accepted-frames set
  4. We get automatic filtering of irrelevant frames

Would you like to explore how this might work with a concrete example?

Yes

Let’s implement your original example (microphone → transcription → logging) using this pattern:

(ns voice-fn.core
  (:require [clojure.core.async :as a :refer [chan mult tap go-loop >! <!]]))

;; Frame constructors (pure functions)
(defn audio-frame [raw-data]
  {:type :audio/raw-input
   :data raw-data
   :ts (System/currentTimeMillis)})

(defn text-frame [text]
  {:type :text/input
   :text text
   :ts (System/currentTimeMillis)})

;; Processor implementations
(defmethod process-frame :transport/local-audio
  [_ state {:keys [sample-rate channels] :as config} frame]
  (case (:type frame)
    :system/start
    (let [audio-stream (start-audio-capture! sample-rate channels)
          stop-fn (fn [] (stop-audio-capture! audio-stream))]
      ;; Store stop-fn in state for cleanup
      (swap! state assoc-in [:transport/local-audio :stop-fn] stop-fn)
      ;; Start sending audio frames
      (go-loop []
        (when-let [data (<! audio-stream)]
          (>! (:main-ch @state) (audio-frame data))
          (recur))))

    :system/stop
    (when-let [stop-fn (get-in @state [:transport/local-audio :stop-fn])]
      (stop-fn))))

(defmethod process-frame :transcription/deepgram
  [_ state {:keys [api-key] :as config} frame]
  (case (:type frame)
    :system/start
    (let [ws-conn (create-deepgram-websocket!
                   api-key
                   {:on-transcription
                    (fn [text]
                      (a/put! (:main-ch @state) (text-frame text)))})]
      (swap! state assoc-in [:transcription/deepgram :conn] ws-conn))

    :system/stop
    (when-let [conn (get-in @state [:transcription/deepgram :conn])]
      (close-connection! conn))

    :audio/raw-input
    (when-let [conn (get-in @state [:transcription/deepgram :conn])]
      (send-audio! conn (:data frame)))))

(defmethod process-frame :log/text-input
  [_ state config frame]
  (case (:type frame)
    :text/input
    (println "Transcription:" (:text frame))))

;; Pipeline configuration
(def pipeline-config
  [{:type :transport/local-audio
    :accepted-frames #{:system/start :system/stop}
    :config {:sample-rate 16000
            :channels 1}}

   {:type :transcription/deepgram
    :accepted-frames #{:system/start :system/stop :audio/raw-input}
    :config {:api-key "your-api-key-here"}}

   {:type :log/text-input
    :accepted-frames #{:text/input}
    :config {}}])

;; Usage
(defn start-pipeline! []
  (let [pipeline (create-pipeline pipeline-config)
        main-ch (:main-ch @pipeline)]
    ;; Send start signal to all processors
    (a/put! main-ch {:type :system/start})
    pipeline))

(defn stop-pipeline! [pipeline]
  (a/put! (:main-ch @pipeline) {:type :system/stop}))

(comment
  ;; Start the pipeline
  (def p (start-pipeline!))

  ;; Stop it after some time
  (stop-pipeline! p))

Key points about this design:

  1. Single shared channel (main-ch) that all frames flow through
  2. Each processor declares what frame types it accepts via accepted-frames
  3. State is managed in one place but partitioned by processor type
  4. Clean separation between configuration and runtime behavior
  5. Easy to ad

How would you split this into namespaces?

Here’s a suggested namespace organization that follows Clojure’s best practices for separation of concerns:

voice-fn
├── src
│   └── voice_fn
│       ├── core.clj           ;; Main API namespace
│       ├── frames.clj         ;; Frame definitions and constructors
│       ├── pipeline.clj       ;; Pipeline creation and management
│       ├── protocols.clj      ;; Any protocols we might need
│       ├── specs.clj          ;; Specs for validation
│       ├── processors
│       │   ├── audio.clj      ;; Audio capture processor
│       │   ├── transcription.clj ;; Transcription processors
│       │   └── logging.clj    ;; Logging processors
│       └── utils
│           ├── async.clj      ;; core.async utilities
│           └── websocket.clj  ;; WebSocket utilities

Here’s how the code might be organized across these namespaces:

;; src/voice_fn/frames.clj
(ns voice-fn.frames)

(defn audio-frame [raw-data]
  {:type :audio/raw-input
   :data raw-data
   :ts (System/currentTimeMillis)})

(defn text-frame [text]
  {:type :text/input
   :text text
   :ts (System/currentTimeMillis)})

;; src/voice_fn/pipeline.clj
(ns voice-fn.pipeline
  (:require [clojure.core.async :as a]))

(defn create-pipeline [processors-config]
  ;; Pipeline creation logic here
  )

(defmulti process-frame
  (fn [processor-type state config frame]
    processor-type))

;; src/voice_fn/processors/audio.clj
(ns voice-fn.processors.audio
  (:require [voice-fn.pipeline :refer [process-frame]]
            [voice-fn.frames :as frames]))

(defmethod process-frame :transport/local-audio
  [_ state config frame]
  ;; Audio processor implementation
  )

;; src/voice_fn/processors/transcription.clj
(ns voice-fn.processors.transcription
  (:require [voice-fn.pipeline :refer [process-frame]]
            [voice-fn.frames :as frames]
            [voice-fn.utils.websocket :as ws]))

(defmethod process-frame :transcription/deepgram
  [_ state config frame]
  ;; Transcription processor implementation
  )

;; src/voice_fn/core.clj
(ns voice-fn.core
  (:require [voice-fn.pipeline :as pipeline]
            [voice-fn.processors.audio]    ;; Load multimethod implementations
            [voice-fn.processors.transcription]
            [voice-fn.processors.logging]))

(def default-pipeline-config
  [{:type :transport/local-audio
    :accepted-frames #{:system/start :system/stop}
    :config {:sample-rate 16000
            :channels 1}}
   ;; ... other processor configs
   ])

(defn start!
  ([] (start! default-pipeline-config))
  ([config]
   (let [pipeline (pipeline/create-pipeline config)]
     (a/put! (:main-ch @pipeline) {:type :system/start})
     pipeline)))

(defn stop! [pipeline]
  (a/put! (:main-ch @pipeline) {:type :system/stop}))

Some key points about this organization:

  1. core.clj provides the main public API
  2. Each processor type gets its own namespace
  3. Frame definitions are centralized in frames.clj
  4. Pipeline logic is isolated in pipeline.clj
  5. Utilities are separated into their own namespaces
  6. Processor implementations are loaded by requiring their namespaces

Would you like me to expand on any particular aspect of this organization?

Can you also write a function for start-audio-capture!

Here’s an implementation of audio capture using the Java Sound API, wrapped in a Clojure-friendly way:

;; src/voice_fn/processors/audio/capture.clj
(ns voice-fn.processors.audio.capture
  (:require [clojure.core.async :as a])
  (:import [javax.sound.sampled
            AudioSystem
            AudioFormat
            DataLine$Info
            TargetDataLine]))

(defn create-audio-format
  "Creates an AudioFormat with given sample rate and channels.
   Default to 16-bit PCM encoding."
  [sample-rate channels]
  (AudioFormat. sample-rate                  ; sample rate
                16                           ; sample size in bits
                channels                     ; channels
                true                         ; signed
                true))                       ; big endian
(defn open-microphone!
  "Opens the microphone with specified format. Returns the TargetDataLine."
  [^AudioFormat format]
  (let [info (DataLine$Info. TargetDataLine format)
        line (AudioSystem/getLine info)]
    (when-not (.isLineSupported info)
      (throw (ex-info "Audio line not supported"
                     {:format format})))
    (.open ^TargetDataLine line format)
    (.start ^TargetDataLine line)
    line))

(defn start-audio-capture!
  "Starts capturing audio from the microphone.
   Returns a channel that will receive byte arrays of audio data.

   Options:
   :sample-rate - The sample rate in Hz (default: 16000)
   :channels - Number of audio channels (default: 1)
   :buffer-size - Size of the buffer in bytes (default: 4096)
   :chan-buf-size - Size of the core.async channel buffer (default: 1024)"
  ([] (start-audio-capture! {}))
  ([{:keys [sample-rate channels buffer-size chan-buf-size]
     :or {sample-rate 16000
          channels 1
          buffer-size 4096
          chan-buf-size 1024}}]
   (let [format (create-audio-format sample-rate channels)
         line (open-microphone! format)
         out-ch (a/chan chan-buf-size)
         buffer (byte-array buffer-size)
         running? (atom true)]

     ;; Start capture loop in a separate thread
     (future
       (try
         (while @running?
           (let [bytes-read (.read ^TargetDataLine line
                                  buffer
                                  0
                                  buffer-size)]
             (when (pos? bytes-read)
               ;; Copy only the bytes that were read
               (let [audio-data (Arrays/copyOfRange buffer 0 bytes-read)]
                 ;; Put data on channel, but don't block if channel is full
                 (a/offer! out-ch audio-data)))))
         (catch Exception e
           (a/put! out-ch {:error e}))
         (finally
           (.stop ^TargetDataLine line)
           (.close ^TargetDataLine line)
           (a/close! out-ch))))

     ;; Return a map with the channel and a stop function
     {:audio-chan out-ch
      :stop! #(reset! running? false)})))

;; Usage example:
(comment
  ;; Start capture
  (def capture (start-audio-capture! {:sample-rate 16000
                                     :channels 1}))

  ;; Process audio data
  (a/go-loop []
    (when-let [data (a/<! (:audio-chan capture))]
      (println "Got" (count data) "bytes")
      (recur)))

  ;; Stop capture
  ((:stop! capture)))

And here’s how to integrate it with your processor:

;; src/voice_fn/processors/audio.clj
(ns voice-fn.processors.audio
  (:require [voice-fn.pipeline :refer [process-frame]]
            [voice-fn.frames :as

*** I've implemented my first demo of this and it works quite well. Here's how it is structurd:

(ns voice-fn.core
  (:require
   [taoensso.telemere :as t]
   [voice-fn.pipeline :as pipeline]
   [voice-fn.secrets :refer [secret]]
   [voice-fn.transport.local.audio]))

(def pipeline
  {:pipeline/config {:audio-in/sample-rate 16000
                     :audio-in/encoding :pcm-signed
                     :audio-in/channels 1
                     :audio-in/file-path "test-voice.wav"
                     :audio-in/sample-size-bits 16 ;; 2 bytes
                     :audio-out/sample-rate 24000
                     :audio-out/bitrate 96000
                     :audio-out/sample-size-bits 16
                     :audio-out/channels 1
                     :pipeline/language :ro}
   :pipeline/processors [{:processor/type :transport/local-audio
                          :processor/accepted-frames #{:system/start :system/stop}
                          :processor/generates-frames #{:audio/raw-input}}
                         {:processor/type :transcription/deepgram
                          :processor/accepted-frames #{:system/start :system/stop :audio/raw-input}
                          :processor/generates-frames #{:text/input}
                          :processor/config {:transcription/api-key (secret [:deepgram :api-key])
                                             :transcription/interim-results? false
                                             :transcription/punctuate? false
                                             :transcription/model :nova-2}}
                         {:processor/type :log/text-input
                          :processor/accepted-frames #{:text/input}
                          :processor/config {}}]})

(defmethod pipeline/process-frame :log/text-input
  [_ _ _ frame]
  (t/log! {:level :info
           :id :log/text-input} ["Frame" (:data frame)]))

(t/set-min-level! :debug)

(comment
  (def p (pipeline/create-pipeline pipeline))

  (:pipeline/processors @p)

  (pipeline/start-pipeline! p)
  (pipeline/stop-pipeline! p)

  ,)


  (ns voice-fn.transport.local.audio
  (:require
   [clojure.core.async :as a]
   [clojure.java.io :as io]
   [taoensso.telemere :as t]
   [uncomplicate.clojure-sound.core :refer [open! read! start!]]
   [uncomplicate.clojure-sound.sampled :refer [audio-format line line-info]]
   [voice-fn.frames :as frames]
   [voice-fn.pipeline :refer [close-processor! process-frame]])
  (:import
   (java.util Arrays)
   (javax.sound.sampled AudioFormat AudioSystem DataLine$Info TargetDataLine)))

(defn- calculate-chunk-size
  "Calculate bytes for ms miliseconds of audio based on format"
  [ms ^AudioFormat audio-format]
  (let [frame-size (.getFrameSize audio-format)
        frame-rate (.getFrameRate audio-format)]
    (* frame-size (int (/ (* frame-rate ms) 1000)))))

(defn start-audio-capture-file!
  "Reads from WAV file in 20ms chunks.
   Returns a channel that will receive byte arrays of audio data."
  [{:audio-in/keys [file-path]
    :or {file-path "input.wav"}}]
  (let [audio-file (io/resource file-path)
        audio-stream (AudioSystem/getAudioInputStream audio-file)
        audio-format (.getFormat audio-stream)
        chunk-size (calculate-chunk-size 20 audio-format)
        buffer (byte-array chunk-size)
        out-ch (a/chan 1024)
        running? (atom true)]

    (future
      (try
        (while (and @running?
                    (pos? (.read audio-stream buffer 0 chunk-size)))
          (let [audio-data (Arrays/copyOf buffer chunk-size)]
            (a/offer! out-ch audio-data)))
        (catch Exception e
          (a/put! out-ch {:error e}))
        (finally
          (.close audio-stream)
          (a/close! out-ch))))

    {:audio-chan out-ch
     :stop-fn #(do (reset! running? false)
                   (a/close! out-ch))}))

(defn line-supported?
  [^DataLine$Info info]
  (AudioSystem/isLineSupported info))

(defn open-microphone!
  "Opens the microphone with specified format. Returns the TargetDataLine."
  [^AudioFormat format]
  (let [info (line-info :target format)
        line (line info)]
    (when-not (line-supported? info)
      (throw (ex-info "Audio line not supported"
                      {:format format})))
    (open! line format)
    (start! line)
    line))

(defn- frame-buffer-size
  "Get read buffer size based on the sample rate for input"
  [sample-rate]
  (* 2 (/ sample-rate 100)))

(defn start-audio-capture!
  "Starts capturing audio from the microphone.
   Returns a channel that will receive byte arrays of audio data.

   Options:
   :sample-rate - The sample rate in Hz (default: 16000)
   :channels - Number of audio channels (default: 1)"
  ([] (start-audio-capture! {}))
  ([{:audio-in/keys [sample-rate sample-size-bits channels]
     :or {sample-rate 16000
          channels 1
          sample-size-bits 16}}]
   (let [buffer-size (frame-buffer-size sample-rate)
         af (audio-format sample-rate sample-size-bits channels)
         line (open-microphone! af)
         out-ch (a/chan 1024)
         buffer (byte-array buffer-size)
         running? (atom true)]

     ;; Start capture loop in a separate thread
     (future
       (try
         (while @running?
           (let [bytes-read (read! line buffer 0 buffer-size)]
             (when (pos? bytes-read)
               ;; Copy only the bytes that were read
               (let [audio-data (Arrays/copyOfRange buffer 0 bytes-read)]
                 ;; Put data on channel, but don't block if channel is full
                 (a/offer! out-ch audio-data)))))
         (catch Exception e
           (a/put! out-ch {:error e}))
         (finally
           (.stop ^TargetDataLine line)
           (.close ^TargetDataLine line)
           (a/close! out-ch))))

     ;; Return a map with the channel and a stop function
     {:audio-chan out-ch
      :stop-fn #(do (a/close! out-ch)
                    (reset! running? false))})))

(defmethod process-frame :transport/local-audio
  [processor-type pipeline _ frame]
  (case (:frame/type frame)
    :system/start
    (do
      (t/log! :debug "Starting audio capture")
      (let [{:keys [audio-chan stop-fn]} (start-audio-capture-file! (:pipeline/config @pipeline))]
        ;; Store stop-fn in state for cleanup
        (swap! pipeline assoc-in [:transport/local-audio :stop-fn] stop-fn)
        ;; Start sending audio frames
        (a/go-loop []
          (when-let [data (a/<! audio-chan)]
            (a/>! (:pipeline/main-ch @pipeline) (frames/audio-input-frame data))
            (recur)))))

    :system/stop
    (do
      (t/log! :debug "Stopping audio capture")
      (when-let [stop-fn (get-in @pipeline [:transport/local-audio :stop-fn])]
        (stop-fn)))
    (close-processor! pipeline processor-type))
  nil)


  Here's my next issue:

  Other types of transports, like telephony or webrtc, involves opening an API endpoint like a webhook or a websocket to receive the stream of audio for input. How can we set up this during the pipeline? What are your suggestions? Here is how pipecat does it:

  import os
import sys

from dotenv import load_dotenv
from loguru import logger

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.network.fastapi_websocket import (
    FastAPIWebsocketParams,
    FastAPIWebsocketTransport,
)

load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def run_bot(websocket_client, stream_sid):
    transport = FastAPIWebsocketTransport(
        websocket=websocket_client,
        params=FastAPIWebsocketParams(
            audio_out_enabled=True,
            add_wav_header=False,
            vad_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
            vad_audio_passthrough=True,
            serializer=TwilioFrameSerializer(stream_sid),
        ),
    )

    llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")

    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

    tts = CartesiaTTSService(
        api_key=os.getenv("CARTESIA_API_KEY"),
        voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",  # British Lady
    )

    messages = [
        {
            "role": "system",
            "content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
        },
    ]

    context = OpenAILLMContext(messages)
    context_aggregator = llm.create_context_aggregator(context)

    pipeline = Pipeline(
        [
            transport.input(),  # Websocket input from client
            stt,  # Speech-To-Text
            context_aggregator.user(),
            llm,  # LLM
            tts,  # Text-To-Speech
            transport.output(),  # Websocket output to client
            context_aggregator.assistant(),
        ]
    )

    task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        # Kick off the conversation.
        messages.append({"role": "system", "content": "Please introduce yourself to the user."})
        await task.queue_frames([LLMMessagesFrame(messages)])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        await task.queue_frames([EndFrame()])

    runner = PipelineRunner(handle_sigint=False)

    await runner.run(task)


    So I can see here that they instantiate the websocket connection for input transport beforehand



For handling WebSocket/HTTP endpoints in your Clojure pipeline, I'd suggest using Ring/HTTP Kit or Aleph for the server component. Here's how we could structure this:

;; src/voice_fn/transport/websocket.clj
(ns voice-fn.transport.websocket
  (:require
    [org.httpkit.server :as http]
    [compojure.core :refer [GET defroutes]]
    [voice-fn.pipeline :refer [process-frame]]
    [clojure.core.async :as a]))

(defn websocket-handler [pipeline req]
  (http/with-channel req channel
    (let [client-id (random-uuid)]
      ;; Store WS connection in pipeline state
      (swap! pipeline assoc-in [:transport/websocket :connections client-id] channel)

      ;; Handle incoming messages
      (http/on-receive channel
        (fn [data]
          (a/put! (:pipeline/main-ch @pipeline)
                  (frames/audio-input-frame data))))

      ;; Handle client disconnect
      (http/on-close channel
        (fn [status]
          (swap! pipeline update-in [:transport/websocket :connections] dissoc client-id))))))

(defroutes websocket-routes
  (GET "/ws" [] #(websocket-handler pipeline %)))

(defmethod process-frame :transport/websocket
  [processor-type pipeline config frame]
  (case (:frame/type frame)
    :system/start
    (let [port (get-in config [:processor/config :port] 8080)
          server (http/run-server websocket-routes {:port port})]
      (swap! pipeline assoc-in [:transport/websocket :server] server))

    :system/stop
    (when-let [server (get-in @pipeline [:transport/websocket :server])]
      (server)) ; Stop the server

    :audio/output
    (doseq [[_ conn] (get-in @pipeline [:transport/websocket :connections])]
      (http/send! conn (:data frame)))))

;; Usage in pipeline config:
(def pipeline
  {:pipeline/config {...}
   :pipeline/processors
   [{:processor/type :transport/websocket
     :processor/accepted-frames #{:system/start :system/stop :audio/output}
     :processor/generates-frames #{:audio/raw-input}
     :processor/config {:port 8080}}
    ...]})

Key points:

  1. The WebSocket transport is started when it receives :system/start
  2. It maintains a map of active connections in the pipeline state
  3. Incoming messages are put onto the main channel as audio frames
  4. Outgoing audio can be sent to all connected clients
  5. The server is cleanly shut down on :system/stop

For telephony, you might have something similar:

;; src/voice_fn/transport/telephony.clj
(ns voice-fn.transport.telephony
  (:require
    [ring.adapter.jetty :as jetty]
    [compojure.core :refer [POST defroutes]]
    [voice-fn.pipeline :refer [process-frame]]))

(defn twilio-webhook-handler [pipeline req]
  ;; Handle incoming Twilio webhook
  (let [audio-data (-> req :body :audio)]
    (a/put! (:pipeline/main-ch @pipeline)
            (frames/audio-input-frame audio-data))))

(defroutes telephony-routes
  (POST "/twilio/webhook" [] #(twilio-webhook-handler pipeline %)))

(defmethod process-frame :transport/telephony
  [processor-type pipeline config frame]
  (case (:frame/type frame)
    :system/start
    (let [port (get-in config [:processor/config :port] 8081)
          server (jetty/run-jetty telephony-routes {:port port :join? false})]
      (swap! pipeline assoc-in [:transport/telephony :server] server))

    :system/stop
    (when-let [server (get-in @pipeline [:transport/telephony :server])]
      (.stop server))))

Would you like me to elaborate on any part of this design?

I would like to change to use reitit instead of compojure

Here’s how we can restructure the WebSocket transport using reitit:

;; src/voice_fn/transport/websocket.clj
(ns voice-fn.transport.websocket
  (:require
   [org.httpkit.server :as http]
   [reitit.ring :as ring]
   [reitit.ring.middleware.muuntaja :as muuntaja]
   [reitit.swagger :as swagger]
   [reitit.swagger-ui :as swagger-ui]
   [muuntaja.core :as m]
   [voice-fn.pipeline :refer [process-frame]]
   [clojure.core.async :as a]))

(defn websocket-handler [pipeline]
  (fn [req]
    (http/with-channel req channel
      (let [client-id (random-uuid)]
        ;; Store WS connection in pipeline state
        (swap! pipeline assoc-in [:transport/websocket :connections client-id] channel)

        ;; Handle incoming messages
        (http/on-receive channel
          (fn [data]
            (a/put! (:pipeline/main-ch @pipeline)
                    (frames/audio-input-frame data))))

        ;; Handle client disconnect
        (http/on-close channel
          (fn [status]
            (swap! pipeline update-in [:transport/websocket :connections] dissoc client-id)))))))

(defn app-routes [pipeline]
  ["/api"
   ["/swagger.json"
    {:get {:no-doc true
           :handler (swagger/create-swagger-handler)}}]

   ["/ws"
    {:get {:handler (websocket-handler pipeline)
           :summary "WebSocket endpoint for audio streaming"}}]])

(def app-middleware
  [muuntaja/format-middleware])

(defn create-app [pipeline]
  (ring/ring-handler
   (ring/router
    [(app-routes pipeline)]
    {:data {:muuntaja m/instance
            :middleware app-middleware}})
   (ring/routes
    (swagger-ui/create-swagger-ui-handler
     {:path "/api/swagger"})
    (ring/create-default-handler))))

(defmethod process-frame :transport/websocket
  [processor-type pipeline config frame]
  (case (:frame/type frame)
    :system/start
    (let [port (get-in config [:processor/config :port] 8080)
          app (create-app pipeline)
          server (http/run-server app {:port port})]
      (swap! pipeline assoc-in [:transport/websocket :server] server))

    :system/stop
    (when-let [server (get-in @pipeline [:transport/websocket :server])]
      (server)) ; Stop the server

    :audio/output
    (doseq [[_ conn] (get-in @pipeline [:transport/websocket :connections])]
      (http/send! conn (:data frame)))))

;; Usage in pipeline config remains the same:
(def pipeline
  {:pipeline/config {...}
   :pipeline/processors
   [{:processor/type :transport/websocket
     :processor/accepted-frames #{:system/start :system/stop :audio/output}
     :processor/generates-frames #{:audio/raw-input}
     :processor/config {:port 8080}}
    ...]})

Key improvements with reitit:

  1. Better route organization with data-driven routing
  2. Built-in support for OpenAPI/Swagger documentation
  3. Better middleware handling
  4. More extensible routing system

Would you like me to:

  1. Add more endpoints (like health checks)?
  2. Show how to add authentication middleware?
  3. Add proper error handling?
  4. Show how to structure the telephony routes in the same system?

This is fine but don’t you agree that this adds too much to the model? I think it might be better to just create the websocket connection outside the pipeline config and just give that as configuration to the transport input

Absolutely! You make a great point. This would better follow the principle of separation of concerns and keep the pipeline focused on processing frames rather than managing servers. Here’s the refined approach:

;; src/voice_fn/server.clj
(ns voice-fn.server
  (:require
   [org.httpkit.server :as http]
   [reitit.ring :as ring]
   [voice-fn.pipeline :as pipeline]))

(defn websocket-handler [pipeline-atom]
  (fn [req]
    (http/with-channel req channel
      (let [client-id (random-uuid)]
        (swap! pipeline-atom assoc-in [:transport/websocket :connections client-id] channel)
        (http/on-receive channel
          (fn [data]
            (pipeline/send-frame! pipeline-atom (frames/audio-input-frame data))))
        (http/on-close channel
          (fn [status]
            (swap! pipeline-atom update-in [:transport/websocket :connections] dissoc client-id)))))))

;; Server setup happens outside pipeline
(defn start-server! [pipeline-atom port]
  (let [app (ring/ring-handler
             (ring/router
              [["/ws" {:get {:handler (websocket-handler pipeline-atom)}}]]))
        server (http/run-server app {:port port})]
    server))

;; The pipeline processor becomes much simpler
(defmethod pipeline/process-frame :transport/websocket
  [_ pipeline _ frame]
  (case (:frame/type frame)
    :audio/output
    (doseq [[_ conn] (get-in @pipeline [:transport/websocket :connections])]
      (http/send! conn (:data frame)))
    nil))

;; Usage:
(comment
  ;; Start server separately
  (def server (start-server! pipeline-atom 8080))

  ;; Pipeline config focuses only on processing
  (def pipeline-config
   {:pipeline/processors
    [{:processor/type :transport/websocket
      :processor/accepted-frames #{:audio/output}
      :processor/generates-frames #{:audio/raw-input}}
     ...]}))

This is much cleaner because:

  1. Server lifecycle is managed separately from the pipeline
  2. Pipeline only handles frame processing
  3. Clearer separation between web concerns and pipeline concerns
  4. More flexible - can have multiple transport inputs feeding the same pipeline

What do you think about this approach?

Here’s an interesting idea from the pipecat project. They have a constant websocket transport that receives a websocket connection as config for the transport and a serializer/deserializer. This is interesting because the websocket transport works the same for different providers, it is just that serializing and deserializing frames for the pipeline will be different.

That’s an excellent pattern! It separates the transport mechanism from the protocol-specific details. Let’s implement this in Clojure:

;; src/voice_fn/transport/websocket/protocols.clj
(ns voice-fn.transport.websocket.protocols)

(defprotocol FrameSerializer
  (serialize-frame [this frame])
  (deserialize-frame [this raw-data]))

;; src/voice_fn/transport/websocket/serializers.clj
(ns voice-fn.transport.websocket.serializers
  (:require [voice-fn.transport.websocket.protocols :as p]
            [voice-fn.frames :as frames]))

;; Example Twilio serializer
(defrecord TwilioSerializer [stream-sid]
  p/FrameSerializer
  (serialize-frame [_ frame]
    ;; Convert pipeline frame to Twilio-specific format
    (case (:frame/type frame)
      :audio/output {:event "media"
                     :streamSid stream-sid
                     :media {:payload (:data frame)}}
      ;; ... other frame types
      ))

  (deserialize-frame [_ raw-data]
    ;; Convert Twilio message to pipeline frame
    (let [data (json/parse-string raw-data true)]
      (case (:event data)
        "media" (frames/audio-input-frame (:payload (:media data)))
        ;; ... other event types
        ))))

;; src/voice_fn/transport/websocket/transport.clj
(ns voice-fn.transport.websocket.transport
  (:require [voice-fn.pipeline :as pipeline]
            [clojure.core.async :as a]))

(defmethod pipeline/process-frame :transport/websocket
  [_ pipeline {:keys [websocket serializer]} frame]
  (case (:frame/type frame)
    :audio/output
    (let [serialized (p/serialize-frame serializer frame)]
      (http/send! websocket serialized))

    :audio/raw-input
    (when-let [data (:data frame)]
      (let [deserialized (p/deserialize-frame serializer data)]
        (a/put! (:pipeline/main-ch @pipeline) deserialized)))

    nil))

;; Usage example:
(comment
  ;; In your server setup
  (defn websocket-handler [pipeline-atom]
    (fn [req]
      (http/with-channel req channel
        (let [stream-sid (get-in req [:params :stream-sid])
              serializer (->TwilioSerializer stream-sid)]
          ;; Add websocket connection and its serializer to pipeline config
          (swap! pipeline-atom update :pipeline/processors conj
                 {:processor/type :transport/websocket
                  :processor/config {:websocket channel
                                   :serializer serializer}
                  :processor/accepted-frames #{:audio/output}
                  :processor/generates-frames #{:audio/raw-input}})))))

  ;; Different serializer for different provider
  (defrecord WebRTCSerializer []
    p/FrameSerializer
    (serialize-frame [_ frame]
      ;; WebRTC-specific serialization
      )
    (deserialize-frame [_ raw-data]
      ;; WebRTC-specific deserialization
      )))

Key benefits of this approach:

  1. Transport logic is protocol-agnostic
  2. Easy to add new providers by implementing new serializers
  3. Clean separation between transport mechanism and protocol details
  4. Testable - can test serializers independently
  5. Flexible - can swap serializers at runtime

Would you like me to:

  1. Add more example serializers?
  2. Show how to handle different types of frames?
  3. Add error handling for malformed messages?

***