Skip to content

Commit 44f7a7d

Browse files
committed
QVAC-20580 feat[api]: add subscribeServerLogs to capture all server logs
Signed-off-by: Arun Mani J <j.arunmani@proton.me>
1 parent 863070d commit 44f7a7d

10 files changed

Lines changed: 172 additions & 2 deletions

File tree

packages/sdk/client/api/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export { completion } from "./completion-stream";
44
export { deleteCache } from "./delete-cache";
55
export { unloadModel } from "./unload-model";
66
export { loggingStream } from "./logging-stream";
7+
export { subscribeServerLogs, type ServerLogHandler } from "./subscribe-logs";
78
export { heartbeat } from "./heartbeat";
89
export { transcribe, transcribeStream } from "./transcribe";
910
export { bciTranscribe, bciTranscribeStream } from "./bci-transcribe";
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { getClientLogger, SDK_ALL_LOG_ID } from "@/logging";
2+
import { loggingStream } from "./logging-stream";
3+
import type { LoggingStreamResponse } from "@/schemas/logging-stream";
4+
5+
const logger = getClientLogger();
6+
7+
export type ServerLogHandler = (log: LoggingStreamResponse) => void;
8+
9+
/**
10+
* Subscribes to all server-side SDK logs (SDK, models, RAG, …) through a single
11+
* stream and returns a function that stops the subscription.
12+
*
13+
* @example
14+
* ```typescript
15+
* const unsubscribe = subscribeServerLogs((log) => {
16+
* console.log(`[${log.level}] ${log.namespace}: ${log.message}`);
17+
* });
18+
* // later
19+
* unsubscribe();
20+
* ```
21+
*/
22+
export function subscribeServerLogs(handler: ServerLogHandler) {
23+
const streamIterator = loggingStream({ id: SDK_ALL_LOG_ID });
24+
25+
void (async () => {
26+
try {
27+
for await (const log of streamIterator) {
28+
handler(log);
29+
}
30+
} catch (error) {
31+
logger.error("Server log stream error:", error);
32+
}
33+
})();
34+
35+
return () => {
36+
void streamIterator.return(undefined);
37+
};
38+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"loggerConsoleOutput": false
3+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// loggerConsoleOutput: false in this config disables the default console
2+
// transport on both the client and the worker, so the only logs printed are
3+
// the ones our handler below receives.
4+
const configDir = import.meta.dirname ?? process.cwd();
5+
process.env["QVAC_CONFIG_PATH"] = `${configDir}/config/logging/logging.config.json`;
6+
7+
const { loadModel, completion, unloadModel, subscribeServerLogs, LLAMA_3_2_1B_INST_Q4_0 } =
8+
await import("@qvac/sdk");
9+
10+
try {
11+
console.log("🚀 Starting global logging demo...\n");
12+
13+
// One subscription captures every server-side log (SDK, models, RAG, …)
14+
// without having to know any stream IDs ahead of time.
15+
const unsubscribe = subscribeServerLogs((log) => {
16+
console.log(`[${log.level.toUpperCase()}] [${log.namespace}] ${log.message}`);
17+
});
18+
19+
const modelId = await loadModel({
20+
modelSrc: LLAMA_3_2_1B_INST_Q4_0,
21+
modelConfig: { ctx_size: 2048 },
22+
});
23+
24+
const result = completion({
25+
modelId,
26+
history: [{ role: "user", content: "Count from 1 to 5." }],
27+
stream: true,
28+
});
29+
30+
console.log("📝 Response:\n");
31+
for await (const token of result.tokenStream) {
32+
process.stdout.write(token);
33+
}
34+
35+
await unloadModel({ modelId, clearStorage: false });
36+
unsubscribe();
37+
} catch (error) {
38+
console.error("❌ Error:", error);
39+
process.exit(1);
40+
}

packages/sdk/index.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ export {
3232
getModelInfo,
3333
getLoadedModelInfo,
3434
loggingStream,
35+
subscribeServerLogs,
36+
type ServerLogHandler,
3537
ocr,
3638
invokePlugin,
3739
invokePluginStream,
@@ -224,7 +226,7 @@ export {
224226
} from "./utils/errors-client";
225227

226228
// Logging exports
227-
export { getLogger, SDK_LOG_ID } from "./logging";
229+
export { getLogger, SDK_LOG_ID, SDK_ALL_LOG_ID } from "./logging";
228230
export type { Logger, LogTransport, LoggerOptions } from "./logging";
229231

230232
// Profiler exports

packages/sdk/logging/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export type { Logger, LoggerOptions, LogTransport } from "./types";
1212
export {
1313
RAG_NAMESPACE,
1414
SDK_LOG_ID,
15+
SDK_ALL_LOG_ID,
1516
SDK_SERVER_NAMESPACE,
1617
type AddonNamespace,
1718
} from "./namespaces";

packages/sdk/logging/namespaces.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,8 @@ export type AddonNamespace = CanonicalModelType | typeof RAG_NAMESPACE;
77
// Reserved ID for SDK server logs
88
export const SDK_LOG_ID = "__sdk__";
99

10+
// Reserved ID for the stream that receives all server-side logs
11+
export const SDK_ALL_LOG_ID = "__all__";
12+
1013
// Namespace for all SDK server logs
1114
export const SDK_SERVER_NAMESPACE = "sdk:server";

packages/sdk/server/bare/registry/logging-stream-registry.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313

1414
import type { LogLevel } from "@qvac/logging";
15+
import { SDK_ALL_LOG_ID } from "@/logging/namespaces";
1516

1617
const loggingStreams = new Map<
1718
string,
@@ -111,6 +112,18 @@ export function sendLogToStreams(
111112
level: LogLevel,
112113
namespace: string,
113114
message: string,
115+
) {
116+
deliverToStream(id, level, namespace, message);
117+
if (id !== SDK_ALL_LOG_ID) {
118+
deliverToStream(SDK_ALL_LOG_ID, level, namespace, message);
119+
}
120+
}
121+
122+
function deliverToStream(
123+
id: string,
124+
level: LogLevel,
125+
namespace: string,
126+
message: string,
114127
) {
115128
const streams = loggingStreams.get(id);
116129
const isBuffering = modelsWithBuffering.has(id);

packages/sdk/server/worker-core.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ import {
1313
clearAllLoggingStreams,
1414
startLogBuffering,
1515
} from "@/server/bare/registry/logging-stream-registry";
16-
import { clearAllAddonLoggers, getServerLogger, SDK_LOG_ID } from "@/logging";
16+
import {
17+
clearAllAddonLoggers,
18+
getServerLogger,
19+
SDK_LOG_ID,
20+
SDK_ALL_LOG_ID,
21+
} from "@/logging";
1722
import { clearPlugins } from "@/server/plugins";
1823
import {
1924
acquireWorkerLock,
@@ -68,6 +73,7 @@ export function initializeWorkerCore(): { hasRPCConfig: boolean } {
6873
}
6974

7075
startLogBuffering(SDK_LOG_ID);
76+
startLogBuffering(SDK_ALL_LOG_ID);
7177

7278
const { hasRPCConfig } = initEnv();
7379

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import test from "brittle";
2+
import { SDK_ALL_LOG_ID, SDK_LOG_ID } from "@/logging";
3+
import {
4+
registerLoggingStream,
5+
unregisterLoggingStream,
6+
sendLogToStreams,
7+
startLogBuffering,
8+
clearAllLoggingStreams,
9+
} from "@/server/bare/registry/logging-stream-registry";
10+
11+
test("global stream receives logs from every source id", (t) => {
12+
clearAllLoggingStreams();
13+
14+
const received: string[] = [];
15+
const handler = (_level: string, _ns: string, message: string) =>
16+
received.push(message);
17+
registerLoggingStream(SDK_ALL_LOG_ID, handler);
18+
19+
sendLogToStreams(SDK_LOG_ID, "info", "sdk:server", "from sdk");
20+
sendLogToStreams("model-123", "debug", "llamacpp-completion", "from model");
21+
22+
t.alike(received, ["from sdk", "from model"], "captures logs across ids");
23+
24+
unregisterLoggingStream(SDK_ALL_LOG_ID, handler);
25+
sendLogToStreams(SDK_LOG_ID, "info", "sdk:server", "after unsubscribe");
26+
t.alike(received, ["from sdk", "from model"], "stops after unsubscribe");
27+
28+
clearAllLoggingStreams();
29+
});
30+
31+
test("per-id stream still receives only its own logs", (t) => {
32+
clearAllLoggingStreams();
33+
34+
const global: string[] = [];
35+
const model: string[] = [];
36+
const globalHandler = (_l: string, _n: string, m: string) => global.push(m);
37+
const modelHandler = (_l: string, _n: string, m: string) => model.push(m);
38+
39+
registerLoggingStream(SDK_ALL_LOG_ID, globalHandler);
40+
registerLoggingStream("model-123", modelHandler);
41+
42+
sendLogToStreams("model-123", "info", "llamacpp-completion", "a");
43+
sendLogToStreams("model-456", "info", "llamacpp-completion", "b");
44+
45+
t.alike(model, ["a"], "model stream only sees its own id");
46+
t.alike(global, ["a", "b"], "global stream sees both");
47+
48+
clearAllLoggingStreams();
49+
});
50+
51+
test("global stream flushes startup logs buffered before subscribe", (t) => {
52+
clearAllLoggingStreams();
53+
startLogBuffering(SDK_ALL_LOG_ID);
54+
55+
sendLogToStreams(SDK_LOG_ID, "info", "sdk:server", "early");
56+
57+
const received: string[] = [];
58+
registerLoggingStream(SDK_ALL_LOG_ID, (_l, _n, m) => received.push(m));
59+
60+
t.alike(received, ["early"], "buffered log delivered on subscribe");
61+
62+
clearAllLoggingStreams();
63+
});

0 commit comments

Comments
 (0)