Skip to content

Commit 12c0435

Browse files
Segfaulting code
1 parent ad8f72c commit 12c0435

File tree

4 files changed

+126
-4
lines changed

4 files changed

+126
-4
lines changed

.gitignore

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
.env
2+
.env.*
3+
.env.production
24
runt
35
echo-agent
46

@@ -18,6 +20,5 @@ __pycache__/
1820
*.pyc
1921
*.pyo
2022

21-
.env
22-
.env.*
23-
.env.production
23+
# Javascript
24+
node_modules/

packages/python-runtime-agent/deno.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
},
1616
"imports": {
1717
"@runt/lib": "jsr:@runt/lib@^0.6.2",
18-
"@runt/schema": "jsr:@runt/schema@^0.6.2"
18+
"@runt/schema": "jsr:@runt/schema@^0.6.2",
19+
"@runtimed/jmp": "npm:@runtimed/jmp@^3.0.0",
20+
"@std/path": "jsr:@std/path@^1.1.1"
1921
},
2022
"tasks": {
2123
"test": "deno test --allow-all",

packages/python-runtime-agent/src/python-worker.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { createLogger } from "@runt/lib";
22
import * as path from "@std/path";
3+
import * as fs from "jsr:@std/fs";
4+
import jmp from "@runtimed/jmp";
35

46
export class PythonWorker {
57
private kernel: Deno.ChildProcess | null = null;
@@ -68,4 +70,111 @@ export class PythonWorker {
6870
getConnPath(): string | null {
6971
return this.connPath;
7072
}
73+
74+
/**
75+
* Returns a JupyterKernelConnection instance after the kernel is started.
76+
*/
77+
async getConnection(): Promise<JupyterKernelConnection> {
78+
if (!this.connPath) throw new Error("Kernel not started");
79+
return await JupyterKernelConnection.connect(this.connPath);
80+
}
81+
}
82+
83+
/**
84+
* Connects to a Jupyter kernel using conn.json and @runtimed/jmp.
85+
*/
86+
export class JupyterKernelConnection {
87+
private shellSocket: any;
88+
private iopubSocket: any;
89+
private controlSocket: any;
90+
private stdinSocket: any;
91+
private hbSocket: any;
92+
private config: any;
93+
private session: string;
94+
private key: string;
95+
private scheme: string;
96+
private username: string;
97+
98+
private constructor(config: any, sockets: Record<string, any>) {
99+
this.config = config;
100+
this.shellSocket = sockets.shell;
101+
this.iopubSocket = sockets.iopub;
102+
this.controlSocket = sockets.control;
103+
this.stdinSocket = sockets.stdin;
104+
this.hbSocket = sockets.hb;
105+
this.session = crypto.randomUUID();
106+
this.key = config.key;
107+
this.scheme = config.signature_scheme.replace("hmac-", "");
108+
this.username = "runt";
109+
}
110+
111+
static async connect(connPath: string): Promise<JupyterKernelConnection> {
112+
const configRaw = await Deno.readTextFile(connPath);
113+
const config = JSON.parse(configRaw);
114+
const mkAddr = (port: number) => `${config.transport}://${config.ip}:${port}`;
115+
const scheme = config.signature_scheme.replace("hmac-", "");
116+
const key = config.key;
117+
const shell = new jmp.Socket("dealer", scheme, key);
118+
const iopub = new jmp.Socket("sub", scheme, key);
119+
const control = new jmp.Socket("dealer", scheme, key);
120+
const stdin = new jmp.Socket("dealer", scheme, key);
121+
const hb = new jmp.Socket("req", scheme, key);
122+
shell.connect(mkAddr(config.shell_port));
123+
iopub.connect(mkAddr(config.iopub_port));
124+
iopub.subscribe("");
125+
control.connect(mkAddr(config.control_port));
126+
stdin.connect(mkAddr(config.stdin_port));
127+
hb.connect(mkAddr(config.hb_port));
128+
return new JupyterKernelConnection(config, { shell, iopub, control, stdin, hb });
129+
}
130+
131+
/**
132+
* Execute Python code and return the result as a Promise.
133+
*/
134+
async execute(code: string): Promise<{ result: string; outputs: unknown[] }> {
135+
const msg_id = crypto.randomUUID();
136+
const header = {
137+
msg_id,
138+
username: this.username,
139+
session: this.session,
140+
msg_type: "execute_request",
141+
version: "5.3",
142+
};
143+
const content = {
144+
code,
145+
silent: false,
146+
store_history: true,
147+
user_expressions: {},
148+
allow_stdin: false,
149+
stop_on_error: true,
150+
};
151+
const msg = new jmp.Message();
152+
msg.header = header;
153+
msg.parent_header = {};
154+
msg.metadata = {};
155+
msg.content = content;
156+
msg.idents = [];
157+
// Send execute_request
158+
this.shellSocket.send(msg);
159+
// Listen for iopub messages for this msg_id
160+
const outputs: unknown[] = [];
161+
let result: string | undefined;
162+
for await (const msg of this.iopubSocket) {
163+
if (msg.parent_header?.msg_id !== msg_id) continue;
164+
if (msg.header.msg_type === "execute_result" || msg.header.msg_type === "display_data") {
165+
outputs.push(msg.content);
166+
if (msg.content.data && msg.content.data["text/plain"]) {
167+
result = msg.content.data["text/plain"];
168+
}
169+
} else if (msg.header.msg_type === "stream") {
170+
outputs.push(msg.content);
171+
} else if (msg.header.msg_type === "error") {
172+
outputs.push(msg.content);
173+
result = msg.content.ename + ": " + msg.content.evalue;
174+
} else if (msg.header.msg_type === "status" && msg.content.execution_state === "idle") {
175+
break;
176+
}
177+
}
178+
return { result: result ?? "", outputs };
179+
}
71180
}

packages/python-runtime-agent/test/python-worker-integration.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ Deno.test({
8484
assertEquals(worker.getKernelPid(), null, "Kernel PID should be null after shutdown");
8585
});
8686

87+
await t.step("Can execute Python code and get result via JupyterKernelConnection", async () => {
88+
worker = new PythonWorker(pythonPath);
89+
await worker.start();
90+
const conn = await worker.getConnection();
91+
const { result, outputs } = await conn.execute("1+2");
92+
assertEquals(result.trim(), "3");
93+
assert(outputs.length > 0);
94+
await worker.shutdown();
95+
});
96+
8797
// Teardown
8898
await t.step("[teardown] remove venv tempdir", async () => {
8999
if (venvDir) {

0 commit comments

Comments
 (0)