Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions docs/website/content/docs/runtime/logging.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ schemaType: HowTo

## Overview

QVAC provides two complementary logging primitives:
QVAC provides three complementary logging primitives:

- `loggingStream()`: stream real-time logs emitted by the SDK server and native addons (llamacpp, whispercpp, etc.). You decide what to do with each log line (print, persist, filter).
- `subscribeServerLogs()`: subscribe to **every** server-side log (SDK server, all loaded models, RAG) through a single stream, without tracking individual stream IDs. Returns an unsubscribe function.
- `loggingStream()`: stream real-time logs for a **single** source β€” the SDK server (`SDK_LOG_ID`) or one model (its model ID). You decide what to do with each log line (print, persist, filter).
- `getLogger()`: create a logger for your own application code (namespaced, configurable level, optional transports).

## Functions

1. [`getLogger()`](/reference/api) β€” create a logger
2. [`loadModel()`](/reference/api#loadmodel) β€” pass logger via `logger` option
3. [`loggingStream()`](/reference/api#loggingstream) β€” stream real-time logs from models or SDK server
3. [`loggingStream()`](/reference/api#loggingstream) β€” stream real-time logs from a single model or the SDK server
4. [`subscribeServerLogs()`](/reference/api#subscribeserverlogs) β€” stream all server-side logs through one subscription

For how to use each function, see [SDK β€” API reference](/reference/api/).

Expand All @@ -33,10 +35,12 @@ For how to use each function, see [SDK β€” API reference](/reference/api/).

## Features

* **Streaming API (`loggingStream`)** β€” Consume real-time logs programmatically. Stream either:
* **Streaming API (`loggingStream`)** β€” Consume real-time logs from one source programmatically. Stream either:
- SDK server logs using `SDK_LOG_ID`, or
- per-model addon logs using the model ID returned by `loadModel()`.

* **Global subscription (`subscribeServerLogs`)** β€” Receive every server-side log (SDK server, all models, RAG) through a single subscription, without knowing the stream IDs ahead of time. The handler is called once per log line, and each log carries its origin in `log.id` (`SDK_LOG_ID`, a model ID, or a RAG workspace key) so you can still tell the sources apart. It is built on `loggingStream` and the reserved `SDK_ALL_LOG_ID` stream that the worker fans all logs into. The call returns an `unsubscribe()` function β€” invoke it to stop receiving logs.

* **Logger API (`getLogger`)** β€” Create loggers for your application code with custom transports. Console output enabled by default; set `enableConsole: false` to use only custom transports.

It works for all model types (LLM, Whisper, NMT, Embeddings) and provides valuable insight into model performance and behavior.
Expand Down Expand Up @@ -69,6 +73,26 @@ The following script shows an example of streaming logs from loaded models:
</Tab>
</Tabs>

To capture logs from every source (SDK server, all models, RAG) with a single subscription, use `subscribeServerLogs`:

<Tabs>
<Tab value="js" label="JavaScript" default>
<WrapCode>

```js file=<rootDir>/packages/sdk/dist/examples/logging-global.js title="logging-global.js" lineNumbers
```
</WrapCode>
</Tab>

<Tab value="ts" label="TypeScript">
<WrapCode>

```ts file=<rootDir>/packages/sdk/examples/logging-global.ts title="logging-global.ts" lineNumbers
```
</WrapCode>
</Tab>
</Tabs>

<Callout type="success">
**Tip:** all examples throughout this documentation are self-contained and runnable. For instructions on how to run them, see [SDK quickstart](/quickstart).
</Callout>
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/client/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export { completion } from "./completion-stream";
export { deleteCache } from "./delete-cache";
export { unloadModel } from "./unload-model";
export { loggingStream } from "./logging-stream";
export { subscribeServerLogs, type ServerLogHandler } from "./subscribe-logs";
export { heartbeat } from "./heartbeat";
export { transcribe, transcribeStream } from "./transcribe";
export { bciTranscribe, bciTranscribeStream } from "./bci-transcribe";
Expand Down
49 changes: 49 additions & 0 deletions packages/sdk/client/api/subscribe-logs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { getClientLogger, SDK_ALL_LOG_ID } from "@/logging";
import { loggingStream } from "./logging-stream";
import type { LoggingStreamResponse } from "@/schemas/logging-stream";

const logger = getClientLogger();

export type ServerLogHandler = (log: LoggingStreamResponse) => void;

/**
* Subscribes to every server-side log through a single stream: SDK server logs,
* per-model addon logs (llamacpp, whispercpp, …) for all loaded models, and RAG
* logs β€” without having to open a {@link loggingStream} per id.
*
* Each delivered log keeps its origin in `log.id`: `SDK_LOG_ID` for SDK server
* logs, the model id for model logs, or the RAG workspace key. Use it to tell the
* sources apart.
*
* Internally this opens a {@link loggingStream} on the reserved `SDK_ALL_LOG_ID`
* stream that the worker fans every log into.
*
* @param handler - called once per log line.
* @returns a function that stops the subscription.
*
* @example
* ```typescript
* const unsubscribe = subscribeServerLogs((log) => {
* console.log(`[${log.level}] ${log.id} ${log.namespace}: ${log.message}`);
* });
* // later
* unsubscribe();
* ```
*/
export function subscribeServerLogs(handler: ServerLogHandler) {
const streamIterator = loggingStream({ id: SDK_ALL_LOG_ID });

void (async () => {
try {
for await (const log of streamIterator) {
handler(log);
}
} catch (error) {
logger.error("Server log stream error:", error);
}
})();

return () => {
void streamIterator.return(undefined);
};
}
3 changes: 3 additions & 0 deletions packages/sdk/examples/config/logging/logging.config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"loggerConsoleOutput": false
}
40 changes: 40 additions & 0 deletions packages/sdk/examples/logging-global.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// loggerConsoleOutput: false in this config disables the default console
// transport on both the client and the worker, so the only logs printed are
// the ones our handler below receives.
const configDir = import.meta.dirname ?? process.cwd();
process.env["QVAC_CONFIG_PATH"] = `${configDir}/config/logging/logging.config.json`;

const { loadModel, completion, unloadModel, subscribeServerLogs, LLAMA_3_2_1B_INST_Q4_0 } =
await import("@qvac/sdk");

try {
console.log("πŸš€ Starting global logging demo...\n");

// One subscription captures every server-side log (SDK, models, RAG, …)
// without having to know any stream IDs ahead of time.
const unsubscribe = subscribeServerLogs((log) => {
console.log(`[${log.level.toUpperCase()}] [${log.namespace}] ${log.message}`);
});

const modelId = await loadModel({
modelSrc: LLAMA_3_2_1B_INST_Q4_0,
modelConfig: { ctx_size: 2048 },
});

const result = completion({
modelId,
history: [{ role: "user", content: "Count from 1 to 5." }],
stream: true,
});

console.log("πŸ“ Response:\n");
for await (const token of result.tokenStream) {
process.stdout.write(token);
}

await unloadModel({ modelId, clearStorage: false });
unsubscribe();
} catch (error) {
console.error("❌ Error:", error);
process.exit(1);
}
4 changes: 3 additions & 1 deletion packages/sdk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export {
getModelInfo,
getLoadedModelInfo,
loggingStream,
subscribeServerLogs,
type ServerLogHandler,
ocr,
invokePlugin,
invokePluginStream,
Expand Down Expand Up @@ -225,7 +227,7 @@ export {
} from "./utils/errors-client";

// Logging exports
export { getLogger, SDK_LOG_ID } from "./logging";
export { getLogger, SDK_LOG_ID, SDK_ALL_LOG_ID } from "./logging";
export type { Logger, LogTransport, LoggerOptions } from "./logging";

// Profiler exports
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/logging/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type { Logger, LoggerOptions, LogTransport } from "./types";
export {
RAG_NAMESPACE,
SDK_LOG_ID,
SDK_ALL_LOG_ID,
SDK_SERVER_NAMESPACE,
type AddonNamespace,
} from "./namespaces";
Expand Down
3 changes: 3 additions & 0 deletions packages/sdk/logging/namespaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ export type AddonNamespace = CanonicalModelType | typeof RAG_NAMESPACE;
// Reserved ID for SDK server logs
export const SDK_LOG_ID = "__sdk__";

// Reserved ID for the stream that receives all server-side logs
export const SDK_ALL_LOG_ID = "__all__";

// Namespace for all SDK server logs
export const SDK_SERVER_NAMESPACE = "sdk:server";
46 changes: 34 additions & 12 deletions packages/sdk/server/bare/registry/logging-stream-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,20 @@
*/

import type { LogLevel } from "@qvac/logging";
import { SDK_ALL_LOG_ID } from "@/logging/namespaces";

const loggingStreams = new Map<
string,
Set<(level: LogLevel, namespace: string, message: string) => void>
>();
// `sourceId` is the id the log was emitted under (a model id, SDK_LOG_ID, a RAG
// workspace key, …). It usually equals the subscription id, but for the global
// SDK_ALL_LOG_ID stream it carries the real origin so subscribers can tell which
// model/SDK/RAG source produced each line instead of always seeing "__all__".
type StreamHandler = (
level: LogLevel,
namespace: string,
message: string,
sourceId: string,
) => void;

const loggingStreams = new Map<string, Set<StreamHandler>>();

// Buffering for logs emitted during model loading (before client subscribes)
const MAX_BUFFERED_LOGS_PER_MODEL = 100;
Expand All @@ -27,6 +36,7 @@ interface BufferedLog {
level: LogLevel;
namespace: string;
message: string;
sourceId: string;
timestamp: number;
}

Expand Down Expand Up @@ -59,10 +69,7 @@ export function stopLogBufferingWithTimeout(id: string) {
bufferingTimeouts.set(id, timeout);
}

export function registerLoggingStream(
id: string,
streamHandler: (level: LogLevel, namespace: string, message: string) => void,
) {
export function registerLoggingStream(id: string, streamHandler: StreamHandler) {
if (!loggingStreams.has(id)) {
loggingStreams.set(id, new Set());
}
Expand All @@ -72,7 +79,7 @@ export function registerLoggingStream(
if (buffered && buffered.length > 0) {
for (const log of buffered) {
try {
streamHandler(log.level, log.namespace, log.message);
streamHandler(log.level, log.namespace, log.message, log.sourceId);
} catch (error) {
console.error(`Error flushing buffered log for ID ${id}:`, error); // fallback (avoid recursion)
}
Expand All @@ -86,7 +93,7 @@ export function registerLoggingStream(

export function unregisterLoggingStream(
id: string,
streamHandler: (level: LogLevel, namespace: string, message: string) => void,
streamHandler: StreamHandler,
) {
const streams = loggingStreams.get(id);
if (streams) {
Expand All @@ -111,14 +118,29 @@ export function sendLogToStreams(
level: LogLevel,
namespace: string,
message: string,
) {
// The originating id is preserved as `sourceId` so the global stream keeps the
// real origin of each log instead of reporting the subscription id.
deliverToStream(id, level, namespace, message, id);
if (id !== SDK_ALL_LOG_ID) {
deliverToStream(SDK_ALL_LOG_ID, level, namespace, message, id);
}
}

function deliverToStream(
id: string,
level: LogLevel,
namespace: string,
message: string,
sourceId: string,
) {
const streams = loggingStreams.get(id);
const isBuffering = modelsWithBuffering.has(id);

if (streams && streams.size > 0) {
for (const streamHandler of streams) {
try {
streamHandler(level, namespace, message);
streamHandler(level, namespace, message, sourceId);
} catch (error) {
console.error(`Error sending log to stream for ID ${id}:`, error); // fallback (avoid recursion)
}
Expand All @@ -139,7 +161,7 @@ export function sendLogToStreams(
validLogs.shift();
}

validLogs.push({ level, namespace, message, timestamp: now });
validLogs.push({ level, namespace, message, sourceId, timestamp: now });
logBuffer.set(id, validLogs);
}
}
Expand Down
5 changes: 4 additions & 1 deletion packages/sdk/server/rpc/handlers/logging-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ export async function* handleLoggingStream(
level: LogLevel,
namespace: string,
message: string,
sourceId: string,
) => {
const logResponse: LoggingStreamResponse = {
type: "loggingStream",
id,
// For the global SDK_ALL_LOG_ID stream `sourceId` is the real origin of the
// log; for a per-id stream it equals the subscription `id`.
id: sourceId,
level: level,
namespace,
message,
Expand Down
16 changes: 15 additions & 1 deletion packages/sdk/server/worker-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ import { closeRegistryClient } from "@/server/bare/registry/registry-client";
import {
clearAllLoggingStreams,
startLogBuffering,
stopLogBufferingWithTimeout,
} from "@/server/bare/registry/logging-stream-registry";
import { clearAllAddonLoggers, getServerLogger, SDK_LOG_ID } from "@/logging";
import {
clearAllAddonLoggers,
getServerLogger,
SDK_LOG_ID,
SDK_ALL_LOG_ID,
} from "@/logging";
import { clearPlugins } from "@/server/plugins";
import {
acquireWorkerLock,
Expand Down Expand Up @@ -68,6 +74,7 @@ export function initializeWorkerCore(): { hasRPCConfig: boolean } {
}

startLogBuffering(SDK_LOG_ID);
startLogBuffering(SDK_ALL_LOG_ID);

const { hasRPCConfig } = initEnv();

Expand Down Expand Up @@ -109,6 +116,13 @@ export function ensureRPCSetup() {
logger.info("Bare worker started and listening for RPC requests");
logger.debug("Working directory:", process.cwd());
rpcInitialized = true;

// The worker is now reachable, so a `subscribeServerLogs` client can connect.
// Bound the global startup buffer the same way model-load buffering is bounded:
// if nothing subscribes within the grace window, stop buffering so every server
// log doesn't keep churning that buffer for the worker's whole lifetime. A
// subscriber that connects in time cancels the timeout and flushes the buffer.
stopLogBufferingWithTimeout(SDK_ALL_LOG_ID);
} catch (error) {
logger.error("Worker error:", error);
process.exit(1);
Expand Down
Loading
Loading