Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions samples/python/live-audio-transcription/src/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Live Audio Transcription — Foundry Local SDK Example (Python)
#
# Demonstrates real-time microphone-to-text using:
# SDK (FoundryLocalManager) → Core (NativeAOT DLL) → onnxruntime-genai (StreamingProcessor)
#
# Usage:
# pip install pyaudio
# python app.py

import threading

import pyaudio
from foundry_local_sdk import Configuration, FoundryLocalManager

print("===========================================================")
print(" Foundry Local -- Live Audio Transcription Demo (Python)")
print("===========================================================")
print()

# Initialize
config = Configuration(app_name="foundry_local_samples")
FoundryLocalManager.initialize(config)
manager = FoundryLocalManager.instance

# Download and load the Nemotron ASR model
model = manager.catalog.get_model("nemotron")
model.download(
lambda progress: print(
f"\rDownloading model: {progress:.2f}%", end="", flush=True
)
)
print()
print(f"Loading model {model.id}...", end="")
model.load()
print("done.")

# Create a live transcription session
audio_client = model.get_audio_client()
session = audio_client.create_live_transcription_session()
session.settings.sample_rate = 16000
session.settings.channels = 1
session.settings.language = "en"

session.start()
print(" Session started")

# Start reading transcription results in a background thread
def read_results():
try:
for result in session.get_transcription_stream():
text = result.content[0].text if result.content else ""
if result.is_final:
print()
print(f" [FINAL] {text}")
elif text:
print(f"\033[96m{text}\033[0m", end="", flush=True)
except Exception:
pass

read_thread = threading.Thread(target=read_results, daemon=True)
read_thread.start()

# Open microphone with PyAudio
RATE = 16000
CHANNELS = 1
FORMAT = pyaudio.paInt16
CHUNK = RATE // 10 # 100ms chunks

pa = pyaudio.PyAudio()
stream = pa.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)

print()
print("===========================================================")
print(" LIVE TRANSCRIPTION ACTIVE")
print(" Speak into your microphone.")
print(" Transcription appears in real-time (cyan text).")
print(" Press ENTER to stop recording.")
print("===========================================================")
print()

# Capture microphone audio in a background thread, push to session
stop_recording = threading.Event()

def capture_mic():
"""Read PCM chunks from the microphone and push to the streaming session."""
while not stop_recording.is_set():
try:
pcm_data = stream.read(CHUNK, exception_on_overflow=False)
if pcm_data:
session.append(pcm_data)
except Exception:
break

capture_thread = threading.Thread(target=capture_mic, daemon=True)
capture_thread.start()

# Wait for ENTER to stop
input()

# Stop recording
stop_recording.set()
capture_thread.join(timeout=2)

stream.stop_stream()
stream.close()
pa.terminate()

session.stop()
read_thread.join()

model.unload()
116 changes: 114 additions & 2 deletions sdk/python/src/detail/core_interop.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ class RequestBuffer(ctypes.Structure):
]


class StreamingRequestBuffer(ctypes.Structure):
"""ctypes Structure matching the native ``StreamingRequestBuffer`` C struct.

Extends ``RequestBuffer`` with binary data fields for sending raw payloads
(e.g. PCM audio bytes) alongside JSON parameters.
"""

_fields_ = [
("Command", ctypes.c_void_p),
("CommandLength", ctypes.c_int),
("Data", ctypes.c_void_p),
("DataLength", ctypes.c_int),
("BinaryData", ctypes.c_void_p),
("BinaryDataLength", ctypes.c_int),
]


class ResponseBuffer(ctypes.Structure):
"""ctypes Structure matching the native ``ResponseBuffer`` C struct."""

Expand Down Expand Up @@ -108,6 +125,32 @@ class CoreInterop:
# Returns c_int: 0 = continue, 1 = cancel.
CALLBACK_TYPE = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p)

@staticmethod
def _load_dll_win(dll_path: str):
"""Load a DLL on Windows using ``LOAD_WITH_ALTERED_SEARCH_PATH``.

This flag tells Windows to resolve the DLL's dependencies starting from
the DLL's own directory rather than the process's default search path.
Prevents conflicts with stale same-named DLLs in system directories
(e.g. an old onnxruntime.dll in system32).

The DLL is first loaded via ``LoadLibraryExW`` (which maps it into the
process), then wrapped in a ``ctypes.CDLL`` for Python access.
"""
kernel32 = ctypes.windll.kernel32
kernel32.LoadLibraryExW.restype = ctypes.c_void_p
kernel32.LoadLibraryExW.argtypes = [ctypes.c_wchar_p, ctypes.c_void_p, ctypes.c_int]
_LOAD_WITH_ALTERED_SEARCH_PATH = 0x00000008

handle = kernel32.LoadLibraryExW(dll_path, None, _LOAD_WITH_ALTERED_SEARCH_PATH)
if not handle:
logger.warning("LoadLibraryExW failed for %s (error %d), falling back to ctypes.CDLL",
dll_path, kernel32.GetLastError())
return ctypes.CDLL(dll_path)

# DLL is now mapped; ctypes.CDLL will reuse the loaded module
return ctypes.CDLL(dll_path)

@staticmethod
def _initialize_native_libraries() -> 'NativeBinaryPaths':
"""Load the native Foundry Local Core library and its dependencies.
Expand Down Expand Up @@ -142,15 +185,20 @@ def _initialize_native_libraries() -> 'NativeBinaryPaths':
for native_dir in paths.all_dirs():
os.add_dll_directory(str(native_dir))

# Set the DLL search directory so that when ORT/GenAI load their
# own dependencies, they find sibling DLLs from the correct
# directory rather than stale copies in system directories.
ctypes.windll.kernel32.SetDllDirectoryW(str(paths.ort_dir))

# Explicitly pre-load ORT and GenAI so their symbols are globally
# available when Core does P/Invoke lookups at runtime.
# On Windows the PATH manipulation above is sufficient; on
# Linux/macOS we need RTLD_GLOBAL so that dlopen() within the
# Core native code can resolve ORT/GenAI symbols.
# ORT must be loaded before GenAI (GenAI depends on ORT).
if sys.platform.startswith("win"):
CoreInterop._ort_library = ctypes.CDLL(str(paths.ort))
CoreInterop._genai_library = ctypes.CDLL(str(paths.genai))
CoreInterop._ort_library = CoreInterop._load_dll_win(str(paths.ort))
CoreInterop._genai_library = CoreInterop._load_dll_win(str(paths.genai))
else:
CoreInterop._ort_library = ctypes.CDLL(str(paths.ort), mode=os.RTLD_GLOBAL)
CoreInterop._genai_library = ctypes.CDLL(str(paths.genai), mode=os.RTLD_GLOBAL)
Expand All @@ -173,6 +221,10 @@ def _initialize_native_libraries() -> 'NativeBinaryPaths':
ctypes.c_void_p] # user_data
lib.execute_command_with_callback.restype = None

lib.execute_command_with_binary.argtypes = [ctypes.POINTER(StreamingRequestBuffer),
ctypes.POINTER(ResponseBuffer)]
lib.execute_command_with_binary.restype = None

return paths

@staticmethod
Expand Down Expand Up @@ -295,6 +347,66 @@ def execute_command_with_callback(self, command_name: str, command_input: Option
response = self._execute_command(command_name, command_input, callback)
return response

def execute_command_with_binary(self, command_name: str,
command_input: Optional[InteropRequest],
binary_data: bytes) -> Response:
"""Execute a command with both JSON parameters and a raw binary payload.

Used for operations like pushing PCM audio data alongside JSON metadata.

Args:
command_name: The native command name (e.g. ``"audio_stream_push"``).
command_input: Optional request parameters (serialized as JSON).
binary_data: Raw binary payload (e.g. PCM audio bytes).

Returns:
A ``Response`` with ``data`` on success or ``error`` on failure.
"""
logger.debug("Executing command with binary: %s Input: %s BinaryLen: %d",
command_name, command_input.params if command_input else None, len(binary_data))

cmd_ptr, cmd_len, cmd_buf = CoreInterop._to_c_buffer(command_name)
data_ptr, data_len, data_buf = CoreInterop._to_c_buffer(
command_input.to_json() if command_input else None
)

# Keep binary data alive for the duration of the native call
binary_buf = ctypes.create_string_buffer(binary_data)
binary_ptr = ctypes.cast(binary_buf, ctypes.c_void_p)

req = StreamingRequestBuffer(
Command=cmd_ptr, CommandLength=cmd_len,
Data=data_ptr, DataLength=data_len,
BinaryData=binary_ptr, BinaryDataLength=len(binary_data),
)
resp = ResponseBuffer()
lib = CoreInterop._flcore_library

lib.execute_command_with_binary(ctypes.byref(req), ctypes.byref(resp))

req = None # Free Python reference to request

response_str = ctypes.string_at(resp.Data, resp.DataLength).decode("utf-8") if resp.Data else None
error_str = ctypes.string_at(resp.Error, resp.ErrorLength).decode("utf-8") if resp.Error else None

lib.free_response(resp)

return Response(data=response_str, error=error_str)

# --- Audio streaming session support ---

def start_audio_stream(self, command_input: InteropRequest) -> Response:
"""Start a real-time audio streaming session via ``audio_stream_start``."""
return self.execute_command("audio_stream_start", command_input)

def push_audio_data(self, command_input: InteropRequest, audio_data: bytes) -> Response:
"""Push a chunk of raw PCM audio data via ``audio_stream_push``."""
return self.execute_command_with_binary("audio_stream_push", command_input, audio_data)

def stop_audio_stream(self, command_input: InteropRequest) -> Response:
"""Stop a real-time audio streaming session via ``audio_stream_stop``."""
return self.execute_command("audio_stream_stop", command_input)


def get_cached_model_ids(core_interop: CoreInterop) -> list[str]:
"""Get the list of models that have been downloaded and are cached."""
Expand Down
18 changes: 17 additions & 1 deletion sdk/python/src/openai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,21 @@

from .chat_client import ChatClient, ChatClientSettings
from .audio_client import AudioClient
from .live_audio_transcription_client import LiveAudioTranscriptionSession
from .live_audio_transcription_types import (
CoreErrorResponse,
LiveAudioTranscriptionOptions,
LiveAudioTranscriptionResponse,
TranscriptionContentPart,
)

__all__ = ["AudioClient", "ChatClient", "ChatClientSettings"]
__all__ = [
"AudioClient",
"ChatClient",
"ChatClientSettings",
"CoreErrorResponse",
"LiveAudioTranscriptionOptions",
"LiveAudioTranscriptionResponse",
"LiveAudioTranscriptionSession",
"TranscriptionContentPart",
]
20 changes: 20 additions & 0 deletions sdk/python/src/openai/audio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from ..detail.core_interop import CoreInterop, InteropRequest
from ..exception import FoundryLocalException
from .live_audio_transcription_client import LiveAudioTranscriptionSession

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,6 +62,25 @@ def __init__(self, model_id: str, core_interop: CoreInterop):
self.settings = AudioSettings()
self._core_interop = core_interop

def create_live_transcription_session(self) -> LiveAudioTranscriptionSession:
"""Create a real-time streaming transcription session.

Audio data is pushed in as PCM chunks and transcription results are
returned as a synchronous generator.

Returns:
A streaming session that should be stopped when done.
Supports use as a context manager::

with audio_client.create_live_transcription_session() as session:
session.settings.sample_rate = 16000
session.start()
session.append(pcm_bytes)
for result in session.get_transcription_stream():
print(result.content[0].text)
"""
return LiveAudioTranscriptionSession(self.model_id, self._core_interop)

@staticmethod
def _validate_audio_file_path(audio_file_path: str) -> None:
"""Validate that the audio file path is a non-empty string."""
Expand Down
Loading
Loading