Skip to content

Commit 7288664

Browse files
committed
QVAC-20580 feat[api]: add subscribeServerLogs to capture all server logs
Signed-off-by: Arun Mani J <j.arunmani@proton.me>
1 parent f83d69c commit 7288664

12 files changed

Lines changed: 300 additions & 19 deletions

File tree

docs/website/content/docs/runtime/logging.mdx

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,18 @@ schemaType: HowTo
66

77
## Overview
88

9-
QVAC provides two complementary logging primitives:
9+
QVAC provides three complementary logging primitives:
1010

11-
- `loggingStream()`: stream real-time logs emitted by the SDK server and native addons (llamacpp, whispercpp, etc.). You decide what to do with each log line (print, persist, filter).
11+
- `subscribeServerLogs()`: subscribe to **every** server-side log (SDK server, all loaded models, RAG) through a single stream, without tracking individual stream IDs. Returns an unsubscribe function.
12+
- `loggingStream()`: stream real-time logs for a **single** source — the SDK server (`SDK_LOG_ID`) or one model (its model ID). You decide what to do with each log line (print, persist, filter).
1213
- `getLogger()`: create a logger for your own application code (namespaced, configurable level, optional transports).
1314

1415
## Functions
1516

1617
1. [`getLogger()`](/reference/api) — create a logger
1718
2. [`loadModel()`](/reference/api#loadmodel) — pass logger via `logger` option
18-
3. [`loggingStream()`](/reference/api#loggingstream) — stream real-time logs from models or SDK server
19+
3. [`loggingStream()`](/reference/api#loggingstream) — stream real-time logs from a single model or the SDK server
20+
4. [`subscribeServerLogs()`](/reference/api#subscribeserverlogs) — stream all server-side logs through one subscription
1921

2022
For how to use each function, see [SDK — API reference](/reference/api/).
2123

@@ -33,10 +35,12 @@ For how to use each function, see [SDK — API reference](/reference/api/).
3335

3436
## Features
3537

36-
* **Streaming API (`loggingStream`)** — Consume real-time logs programmatically. Stream either:
38+
* **Streaming API (`loggingStream`)** — Consume real-time logs from one source programmatically. Stream either:
3739
- SDK server logs using `SDK_LOG_ID`, or
3840
- per-model addon logs using the model ID returned by `loadModel()`.
3941

42+
* **Global subscription (`subscribeServerLogs`)** — Receive every server-side log (SDK server, all models, RAG) through a single subscription, without knowing the stream IDs ahead of time. The handler is called once per log line, and each log carries its origin in `log.id` (`SDK_LOG_ID`, a model ID, or a RAG workspace key) so you can still tell the sources apart. It is built on `loggingStream` and the reserved `SDK_ALL_LOG_ID` stream that the worker fans all logs into. The call returns an `unsubscribe()` function — invoke it to stop receiving logs.
43+
4044
* **Logger API (`getLogger`)** — Create loggers for your application code with custom transports. Console output enabled by default; set `enableConsole: false` to use only custom transports.
4145

4246
It works for all model types (LLM, Whisper, NMT, Embeddings) and provides valuable insight into model performance and behavior.
@@ -69,6 +73,26 @@ The following script shows an example of streaming logs from loaded models:
6973
</Tab>
7074
</Tabs>
7175

76+
To capture logs from every source (SDK server, all models, RAG) with a single subscription, use `subscribeServerLogs`:
77+
78+
<Tabs>
79+
<Tab value="js" label="JavaScript" default>
80+
<WrapCode>
81+
82+
```js file=<rootDir>/packages/sdk/dist/examples/logging-global.js title="logging-global.js" lineNumbers
83+
```
84+
</WrapCode>
85+
</Tab>
86+
87+
<Tab value="ts" label="TypeScript">
88+
<WrapCode>
89+
90+
```ts file=<rootDir>/packages/sdk/examples/logging-global.ts title="logging-global.ts" lineNumbers
91+
```
92+
</WrapCode>
93+
</Tab>
94+
</Tabs>
95+
7296
<Callout type="success">
7397
**Tip:** all examples throughout this documentation are self-contained and runnable. For instructions on how to run them, see [SDK quickstart](/quickstart).
7498
</Callout>

packages/sdk/client/api/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export { completion } from "./completion-stream";
44
export { deleteCache } from "./delete-cache";
55
export { unloadModel } from "./unload-model";
66
export { loggingStream } from "./logging-stream";
7+
export { subscribeServerLogs, type ServerLogHandler } from "./subscribe-logs";
78
export { heartbeat } from "./heartbeat";
89
export { transcribe, transcribeStream } from "./transcribe";
910
export { bciTranscribe, bciTranscribeStream } from "./bci-transcribe";
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { getClientLogger, SDK_ALL_LOG_ID } from "@/logging";
2+
import { loggingStream } from "./logging-stream";
3+
import type { LoggingStreamResponse } from "@/schemas/logging-stream";
4+
5+
const logger = getClientLogger();
6+
7+
export type ServerLogHandler = (log: LoggingStreamResponse) => void;
8+
9+
/**
10+
* Subscribes to every server-side log through a single stream: SDK server logs,
11+
* per-model addon logs (llamacpp, whispercpp, …) for all loaded models, and RAG
12+
* logs — without having to open a {@link loggingStream} per id.
13+
*
14+
* Each delivered log keeps its origin in `log.id`: `SDK_LOG_ID` for SDK server
15+
* logs, the model id for model logs, or the RAG workspace key. Use it to tell the
16+
* sources apart.
17+
*
18+
* Internally this opens a {@link loggingStream} on the reserved `SDK_ALL_LOG_ID`
19+
* stream that the worker fans every log into.
20+
*
21+
* @param handler - called once per log line.
22+
* @returns a function that stops the subscription.
23+
*
24+
* @example
25+
* ```typescript
26+
* const unsubscribe = subscribeServerLogs((log) => {
27+
* console.log(`[${log.level}] ${log.id} ${log.namespace}: ${log.message}`);
28+
* });
29+
* // later
30+
* unsubscribe();
31+
* ```
32+
*/
33+
export function subscribeServerLogs(handler: ServerLogHandler) {
34+
const streamIterator = loggingStream({ id: SDK_ALL_LOG_ID });
35+
36+
void (async () => {
37+
try {
38+
for await (const log of streamIterator) {
39+
handler(log);
40+
}
41+
} catch (error) {
42+
logger.error("Server log stream error:", error);
43+
}
44+
})();
45+
46+
return () => {
47+
void streamIterator.return(undefined);
48+
};
49+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"loggerConsoleOutput": false
3+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// loggerConsoleOutput: false in this config disables the default console
2+
// transport on both the client and the worker, so the only logs printed are
3+
// the ones our handler below receives.
4+
const configDir = import.meta.dirname ?? process.cwd();
5+
process.env["QVAC_CONFIG_PATH"] = `${configDir}/config/logging/logging.config.json`;
6+
7+
const { loadModel, completion, unloadModel, subscribeServerLogs, LLAMA_3_2_1B_INST_Q4_0 } =
8+
await import("@qvac/sdk");
9+
10+
try {
11+
console.log("🚀 Starting global logging demo...\n");
12+
13+
// One subscription captures every server-side log (SDK, models, RAG, …)
14+
// without having to know any stream IDs ahead of time.
15+
const unsubscribe = subscribeServerLogs((log) => {
16+
console.log(`[${log.level.toUpperCase()}] [${log.namespace}] ${log.message}`);
17+
});
18+
19+
const modelId = await loadModel({
20+
modelSrc: LLAMA_3_2_1B_INST_Q4_0,
21+
modelConfig: { ctx_size: 2048 },
22+
});
23+
24+
const result = completion({
25+
modelId,
26+
history: [{ role: "user", content: "Count from 1 to 5." }],
27+
stream: true,
28+
});
29+
30+
console.log("📝 Response:\n");
31+
for await (const token of result.tokenStream) {
32+
process.stdout.write(token);
33+
}
34+
35+
await unloadModel({ modelId, clearStorage: false });
36+
unsubscribe();
37+
} catch (error) {
38+
console.error("❌ Error:", error);
39+
process.exit(1);
40+
}

packages/sdk/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ export {
3232
getModelInfo,
3333
getLoadedModelInfo,
3434
loggingStream,
35+
subscribeServerLogs,
36+
type ServerLogHandler,
3537
ocr,
3638
invokePlugin,
3739
invokePluginStream,
@@ -225,7 +227,7 @@ export {
225227
} from "./utils/errors-client";
226228

227229
// Logging exports
228-
export { getLogger, SDK_LOG_ID } from "./logging";
230+
export { getLogger, SDK_LOG_ID, SDK_ALL_LOG_ID } from "./logging";
229231
export type { Logger, LogTransport, LoggerOptions } from "./logging";
230232

231233
// Profiler exports

packages/sdk/logging/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export type { Logger, LoggerOptions, LogTransport } from "./types";
1212
export {
1313
RAG_NAMESPACE,
1414
SDK_LOG_ID,
15+
SDK_ALL_LOG_ID,
1516
SDK_SERVER_NAMESPACE,
1617
type AddonNamespace,
1718
} from "./namespaces";

packages/sdk/logging/namespaces.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,8 @@ export type AddonNamespace = CanonicalModelType | typeof RAG_NAMESPACE;
77
// Reserved ID for SDK server logs
88
export const SDK_LOG_ID = "__sdk__";
99

10+
// Reserved ID for the stream that receives all server-side logs
11+
export const SDK_ALL_LOG_ID = "__all__";
12+
1013
// Namespace for all SDK server logs
1114
export const SDK_SERVER_NAMESPACE = "sdk:server";

packages/sdk/server/bare/registry/logging-stream-registry.ts

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,20 @@
1212
*/
1313

1414
import type { LogLevel } from "@qvac/logging";
15+
import { SDK_ALL_LOG_ID } from "@/logging/namespaces";
1516

16-
const loggingStreams = new Map<
17-
string,
18-
Set<(level: LogLevel, namespace: string, message: string) => void>
19-
>();
17+
// `sourceId` is the id the log was emitted under (a model id, SDK_LOG_ID, a RAG
18+
// workspace key, …). It usually equals the subscription id, but for the global
19+
// SDK_ALL_LOG_ID stream it carries the real origin so subscribers can tell which
20+
// model/SDK/RAG source produced each line instead of always seeing "__all__".
21+
type StreamHandler = (
22+
level: LogLevel,
23+
namespace: string,
24+
message: string,
25+
sourceId: string,
26+
) => void;
27+
28+
const loggingStreams = new Map<string, Set<StreamHandler>>();
2029

2130
// Buffering for logs emitted during model loading (before client subscribes)
2231
const MAX_BUFFERED_LOGS_PER_MODEL = 100;
@@ -27,6 +36,7 @@ interface BufferedLog {
2736
level: LogLevel;
2837
namespace: string;
2938
message: string;
39+
sourceId: string;
3040
timestamp: number;
3141
}
3242

@@ -59,10 +69,7 @@ export function stopLogBufferingWithTimeout(id: string) {
5969
bufferingTimeouts.set(id, timeout);
6070
}
6171

62-
export function registerLoggingStream(
63-
id: string,
64-
streamHandler: (level: LogLevel, namespace: string, message: string) => void,
65-
) {
72+
export function registerLoggingStream(id: string, streamHandler: StreamHandler) {
6673
if (!loggingStreams.has(id)) {
6774
loggingStreams.set(id, new Set());
6875
}
@@ -72,7 +79,7 @@ export function registerLoggingStream(
7279
if (buffered && buffered.length > 0) {
7380
for (const log of buffered) {
7481
try {
75-
streamHandler(log.level, log.namespace, log.message);
82+
streamHandler(log.level, log.namespace, log.message, log.sourceId);
7683
} catch (error) {
7784
console.error(`Error flushing buffered log for ID ${id}:`, error); // fallback (avoid recursion)
7885
}
@@ -86,7 +93,7 @@ export function registerLoggingStream(
8693

8794
export function unregisterLoggingStream(
8895
id: string,
89-
streamHandler: (level: LogLevel, namespace: string, message: string) => void,
96+
streamHandler: StreamHandler,
9097
) {
9198
const streams = loggingStreams.get(id);
9299
if (streams) {
@@ -111,14 +118,29 @@ export function sendLogToStreams(
111118
level: LogLevel,
112119
namespace: string,
113120
message: string,
121+
) {
122+
// The originating id is preserved as `sourceId` so the global stream keeps the
123+
// real origin of each log instead of reporting the subscription id.
124+
deliverToStream(id, level, namespace, message, id);
125+
if (id !== SDK_ALL_LOG_ID) {
126+
deliverToStream(SDK_ALL_LOG_ID, level, namespace, message, id);
127+
}
128+
}
129+
130+
function deliverToStream(
131+
id: string,
132+
level: LogLevel,
133+
namespace: string,
134+
message: string,
135+
sourceId: string,
114136
) {
115137
const streams = loggingStreams.get(id);
116138
const isBuffering = modelsWithBuffering.has(id);
117139

118140
if (streams && streams.size > 0) {
119141
for (const streamHandler of streams) {
120142
try {
121-
streamHandler(level, namespace, message);
143+
streamHandler(level, namespace, message, sourceId);
122144
} catch (error) {
123145
console.error(`Error sending log to stream for ID ${id}:`, error); // fallback (avoid recursion)
124146
}
@@ -139,7 +161,7 @@ export function sendLogToStreams(
139161
validLogs.shift();
140162
}
141163

142-
validLogs.push({ level, namespace, message, timestamp: now });
164+
validLogs.push({ level, namespace, message, sourceId, timestamp: now });
143165
logBuffer.set(id, validLogs);
144166
}
145167
}

packages/sdk/server/rpc/handlers/logging-stream.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ export async function* handleLoggingStream(
1717
level: LogLevel,
1818
namespace: string,
1919
message: string,
20+
sourceId: string,
2021
) => {
2122
const logResponse: LoggingStreamResponse = {
2223
type: "loggingStream",
23-
id,
24+
// For the global SDK_ALL_LOG_ID stream `sourceId` is the real origin of the
25+
// log; for a per-id stream it equals the subscription `id`.
26+
id: sourceId,
2427
level: level,
2528
namespace,
2629
message,

0 commit comments

Comments
 (0)