forked from microsoft/agent-governance-toolkit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlangchain_safe_mode.py
More file actions
376 lines (297 loc) · 13.6 KB
/
langchain_safe_mode.py
File metadata and controls
376 lines (297 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
🛡️ Agent OS Safe Mode Demo for LangChain
==========================================
This script demonstrates Agent OS blocking dangerous operations
from a LangChain agent in real-time.
What happens:
1. A LangChain agent with tools tries dangerous operations
2. Agent OS intercepts tool calls
3. The kernel BLOCKS dangerous actions
4. Your data stays safe ✓
Run:
pip install agent-os-kernel langchain
python langchain_safe_mode.py
For PyPI package: pip install langchain-agent-os
"""
import os
import sys
from datetime import datetime
from typing import Any, Callable
# ============================================================================
# ANSI Colors for Terminal Output
# ============================================================================
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
RESET = '\033[0m'
def print_banner():
print(f"""
{Colors.CYAN}{Colors.BOLD}
╔═══════════════════════════════════════════════════════════╗
║ ║
║ 🛡️ AGENT OS - Safe Mode Demo for LangChain ║
║ ║
║ Kernel-level safety for autonomous AI agents ║
║ ║
╚═══════════════════════════════════════════════════════════╝
{Colors.RESET}""")
def print_section(title: str):
print(f"\n{Colors.BLUE}{Colors.BOLD}{'─' * 60}{Colors.RESET}")
print(f"{Colors.BLUE}{Colors.BOLD} {title}{Colors.RESET}")
print(f"{Colors.BLUE}{Colors.BOLD}{'─' * 60}{Colors.RESET}\n")
def print_blocked(action: str, reason: str):
print(f"""
{Colors.RED}{Colors.BOLD}
╔═══════════════════════════════════════════════════════════╗
║ 🚫 ACCESS DENIED - POLICY VIOLATION ║
╠═══════════════════════════════════════════════════════════╣
║ ║
║ Tool: {action:<48} ║
║ Reason: {reason:<48} ║
║ Status: BLOCKED BY KERNEL ║
║ ║
╚═══════════════════════════════════════════════════════════╝
{Colors.RESET}""")
def print_allowed(action: str):
print(f"{Colors.GREEN} ✅ ALLOWED:{Colors.RESET} {action}")
def print_log(level: str, message: str):
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
color = {
"INFO": Colors.CYAN,
"WARN": Colors.YELLOW,
"ERROR": Colors.RED,
"BLOCK": Colors.RED + Colors.BOLD,
}.get(level, Colors.WHITE)
print(f" {Colors.WHITE}[{timestamp}]{Colors.RESET} {color}[{level}]{Colors.RESET} {message}")
# ============================================================================
# Agent OS Kernel
# ============================================================================
class SafetyPolicy:
"""Policy engine for LangChain tools"""
BLOCKED_TOOLS = {
"delete_file": "File deletion not permitted",
"execute_sql": "Raw SQL execution requires approval",
"shell": "Shell access is restricted",
"run_command": "System commands are restricted",
"write_file": "File write requires explicit path approval",
}
BLOCKED_PATTERNS = [
("DROP", "Destructive SQL operation"),
("DELETE FROM", "Bulk deletion requires approval"),
("rm -rf", "Recursive delete blocked"),
("sudo", "Privilege escalation blocked"),
("chmod", "Permission changes blocked"),
("eval(", "Code execution blocked"),
("exec(", "Code execution blocked"),
]
ALLOWED_TOOLS = [
"search",
"calculator",
"read_file",
"web_search",
"get_weather",
]
@classmethod
def check_tool(cls, tool_name: str, tool_input: str) -> tuple[bool, str]:
"""Check if a tool invocation is allowed"""
# Check blocked tools
if tool_name.lower() in cls.BLOCKED_TOOLS:
return False, cls.BLOCKED_TOOLS[tool_name.lower()]
# Check blocked patterns in input
for pattern, reason in cls.BLOCKED_PATTERNS:
if pattern.lower() in tool_input.lower():
return False, reason
# Allow safe tools
if tool_name.lower() in cls.ALLOWED_TOOLS:
return True, "Safe tool"
# Default: allow with logging
return True, "Allowed (default policy)"
class AgentOSKernel:
"""
Agent OS Kernel for LangChain.
Wraps tool execution with safety checks.
"""
def __init__(self, agent_id: str = "langchain-agent"):
self.agent_id = agent_id
self.audit_log = []
self.blocked_count = 0
self.allowed_count = 0
print_log("INFO", f"Kernel initialized for agent: {agent_id}")
print_log("INFO", "LangChain tool governance enabled")
def wrap_tool(self, tool_func: Callable, tool_name: str) -> Callable:
"""Wrap a LangChain tool with safety checks"""
kernel = self
def governed_tool(input_str: str) -> str:
return kernel.execute_tool(tool_name, input_str, tool_func)
governed_tool.__name__ = tool_func.__name__ if hasattr(tool_func, '__name__') else tool_name
governed_tool.__doc__ = tool_func.__doc__ if hasattr(tool_func, '__doc__') else ""
return governed_tool
def execute_tool(self, tool_name: str, tool_input: str, tool_func: Callable) -> str:
"""Execute a tool through the kernel"""
print_log("INFO", f"Tool invocation: {tool_name}({tool_input[:50]}...)")
# Check policy
allowed, reason = SafetyPolicy.check_tool(tool_name, tool_input)
# Audit log
self.audit_log.append({
"timestamp": datetime.now().isoformat(),
"agent_id": self.agent_id,
"tool": tool_name,
"input": tool_input,
"allowed": allowed,
"reason": reason,
})
if not allowed:
self.blocked_count += 1
print_log("BLOCK", f"DENIED: {reason}")
print_blocked(tool_name, reason)
raise PermissionError(f"🛡️ Agent OS: {reason}")
self.allowed_count += 1
print_allowed(f"{tool_name}({tool_input[:30]}...)")
# Execute the actual tool
return tool_func(tool_input)
def get_stats(self) -> dict:
return {
"agent_id": self.agent_id,
"total_requests": len(self.audit_log),
"allowed": self.allowed_count,
"blocked": self.blocked_count,
}
# ============================================================================
# Simulated LangChain Tools (for demo without actual LangChain)
# ============================================================================
class SimulatedTools:
"""Simulates LangChain tools for the demo"""
@staticmethod
def search(query: str) -> str:
return f"Search results for: {query}"
@staticmethod
def calculator(expression: str) -> str:
return f"Calculated: {expression} = 42"
@staticmethod
def read_file(path: str) -> str:
return f"Contents of {path}: [file data]"
@staticmethod
def delete_file(path: str) -> str:
return f"Deleted: {path}" # Would never actually run!
@staticmethod
def execute_sql(query: str) -> str:
return f"SQL result: {query}"
@staticmethod
def shell(command: str) -> str:
return f"Shell output: {command}"
class SimulatedLangChainAgent:
"""Simulates a LangChain ReAct agent"""
def __init__(self, tools: dict, kernel: AgentOSKernel):
self.tools = tools
self.kernel = kernel
# Wrap all tools with kernel governance
self.governed_tools = {
name: kernel.wrap_tool(func, name)
for name, func in tools.items()
}
def run(self, query: str) -> str:
"""Simulates agent execution with tool use"""
print(f"\n{Colors.YELLOW} 🤖 Agent processing: {query}{Colors.RESET}")
results = []
# Simulate the agent's reasoning and tool selection
if "delete" in query.lower() or "remove" in query.lower():
# Agent tries to use delete tool
try:
result = self.governed_tools["delete_file"]("/important/data.txt")
results.append(result)
except PermissionError as e:
results.append(f"Tool blocked: {e}")
if "sql" in query.lower() or "database" in query.lower():
# Agent tries SQL
try:
result = self.governed_tools["execute_sql"]("DROP TABLE users;")
results.append(result)
except PermissionError as e:
results.append(f"Tool blocked: {e}")
if "search" in query.lower() or "find" in query.lower():
# Safe search operation
result = self.governed_tools["search"](query)
results.append(result)
if "calculate" in query.lower() or "math" in query.lower():
# Safe calculator
result = self.governed_tools["calculator"]("2 + 2")
results.append(result)
if "shell" in query.lower() or "command" in query.lower():
# Agent tries shell access
try:
result = self.governed_tools["shell"]("rm -rf /")
results.append(result)
except PermissionError as e:
results.append(f"Tool blocked: {e}")
return "\n".join(results) if results else "No tools needed"
# ============================================================================
# Main Demo
# ============================================================================
def run_demo():
print_banner()
# Initialize kernel
print_section("INITIALIZING AGENT OS KERNEL")
kernel = AgentOSKernel(agent_id="langchain-demo")
# Create tools
print_section("REGISTERING LANGCHAIN TOOLS")
tools = {
"search": SimulatedTools.search,
"calculator": SimulatedTools.calculator,
"read_file": SimulatedTools.read_file,
"delete_file": SimulatedTools.delete_file,
"execute_sql": SimulatedTools.execute_sql,
"shell": SimulatedTools.shell,
}
for tool_name in tools:
print_log("INFO", f"Registered tool: {tool_name}")
# Create agent
agent = SimulatedLangChainAgent(tools, kernel)
# Demo 1: Safe search
print_section("DEMO 1: SAFE SEARCH (Allowed)")
agent.run("Search for Python tutorials")
# Demo 2: File deletion attempt
print_section("DEMO 2: FILE DELETION (Blocked)")
agent.run("Delete the old log files")
# Demo 3: SQL injection attempt
print_section("DEMO 3: DESTRUCTIVE SQL (Blocked)")
agent.run("Run this SQL: DROP TABLE users")
# Demo 4: Shell access attempt
print_section("DEMO 4: SHELL ACCESS (Blocked)")
agent.run("Execute shell command to clean disk")
# Demo 5: Safe calculation
print_section("DEMO 5: SAFE CALCULATION (Allowed)")
agent.run("Calculate the sum of 2 + 2")
# Statistics
print_section("KERNEL STATISTICS")
stats = kernel.get_stats()
print(f"""
{Colors.CYAN} 📊 Agent OS Kernel Report
─────────────────────────────────────────
Agent ID: {stats['agent_id']}
Total Requests: {stats['total_requests']}
✅ Allowed: {stats['allowed']}
🚫 Blocked: {stats['blocked']}
─────────────────────────────────────────{Colors.RESET}
""")
print(f"""
{Colors.GREEN}{Colors.BOLD}
╔═══════════════════════════════════════════════════════════╗
║ ║
║ ✅ DEMO COMPLETE - LangChain agent safely governed! ║
║ ║
║ Install the package: pip install langchain-agent-os ║
║ ║
╚═══════════════════════════════════════════════════════════╝
{Colors.RESET}""")
print(f"\n{Colors.CYAN} 🔗 GitHub: https://github.com/microsoft/agent-governance-toolkit{Colors.RESET}\n")
if __name__ == "__main__":
run_demo()