-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathenv.py
More file actions
63 lines (43 loc) · 1.77 KB
/
env.py
File metadata and controls
63 lines (43 loc) · 1.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .rpc import AsyncRpcClient
from .bridge import AsyncBridge
class AsyncSandboxEnv:
def __init__(self, rpc: AsyncRpcClient):
self._rpc = rpc
async def get(self, key: str) -> str:
"""Get the value of an environment variable."""
params = {"key": key}
result = await self._rpc.call("envGet", params)
return result
async def set(self, key: str, value: str) -> None:
"""Set the value of an environment variable."""
params = {"key": key, "value": value}
await self._rpc.call("envSet", params)
async def to_object(self) -> dict[str, str]:
"""Get all environment variables."""
params = {}
result = await self._rpc.call("envToObject", params)
return result
async def delete(self, key: str) -> None:
"""Delete an environment variable."""
params = {"key": key}
await self._rpc.call("envDelete", params)
class SandboxEnv:
def __init__(self, rpc: AsyncRpcClient, bridge: AsyncBridge):
self._rpc = rpc
self._bridge = bridge
self._async = AsyncSandboxEnv(rpc)
def get(self, key: str) -> str:
"""Get the value of an environment variable."""
return self._bridge.run(self._async.get(key))
def set(self, key: str, value: str) -> None:
"""Set the value of an environment variable."""
self._bridge.run(self._async.set(key, value))
def to_object(self) -> dict[str, str]:
"""Get all environment variables."""
return self._bridge.run(self._async.to_object())
def delete(self, key: str) -> None:
"""Delete an environment variable."""
self._bridge.run(self._async.delete(key))