forked from microsoft/agent-governance-toolkit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcheck_dependency_confusion.py
More file actions
366 lines (323 loc) · 13.4 KB
/
check_dependency_confusion.py
File metadata and controls
366 lines (323 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Pre-commit hook: detect unregistered PyPI package names in pip install commands.
Scans staged files for `pip install <name>` where <name> is not a known
registered package. Prevents dependency confusion attacks.
Usage:
# Install as pre-commit hook
cp scripts/check_dependency_confusion.py .git/hooks/pre-commit
chmod +x .git/hooks/pre-commit
# Or run manually
python scripts/check_dependency_confusion.py [files...]
"""
import argparse
import glob
import json
import re
import subprocess
import sys
# Known registered PyPI package names for this project
REGISTERED_PACKAGES = {
# Core packages (on PyPI)
"agent-os-kernel",
"agentmesh-platform",
"agent-hypervisor",
"agentmesh-runtime",
"agent-sre",
"agent-governance-toolkit",
"agentmesh-lightning",
"agentmesh-marketplace",
# Common dependencies
"pydantic", "pyyaml", "cryptography", "pynacl", "httpx", "aiohttp",
"fastapi", "uvicorn", "structlog", "click", "rich", "numpy", "scipy",
"pytest", "pytest-asyncio", "pytest-cov", "ruff", "mypy", "build",
"openai", "anthropic", "langchain", "langchain-core", "crewai",
"redis", "sqlalchemy", "asyncpg", "chromadb", "pinecone-client",
"sentence-transformers", "prometheus-client", "opentelemetry-api",
"opentelemetry-sdk", "fhir.resources", "hl7apy", "zenpy", "freshdesk",
"google-adk", "safety", "jupyter", "vitest", "tsup", "typescript",
# Dashboard / visualization (used in examples)
"streamlit", "plotly", "pandas", "networkx", "matplotlib", "pyvis",
# Async / caching (used in examples)
"aioredis", "aiofiles", "aiosqlite",
# Document processing / NLP (used in examples)
"pypdf", "python-docx", "pdfplumber", "beautifulsoup4", "lxml",
"spacy", "nltk", "tiktoken", "scikit-learn",
# Dev tools
"black", "flake8", "types-PyYAML",
# Infrastructure / runtime (used in examples)
"docker", "huggingface-hub", "python-dotenv", "python-dateutil",
"python-multipart", "python-json-logger", "langchain-openai",
# Slack / messaging
"slack-sdk", "slack-bolt",
# Telemetry
"opentelemetry-instrumentation-fastapi",
# Internal cross-package references (local-only, NOT on PyPI)
# These are flagged as HIGH RISK if found in requirements.txt with version pins
# instead of path references. See dependency confusion attack vector.
"agent-primitives", "emk",
# With extras (base name is what matters)
}
# Local-only packages that should NEVER appear with version pins in
# requirements.txt (they must use path references like -e ../primitives)
LOCAL_ONLY_PACKAGES = {"agent-primitives", "emk"}
# Known npm packages for this project
REGISTERED_NPM_PACKAGES = {
"@microsoft/agent-os-kernel", "@microsoft/agentmesh-mcp-proxy",
"@microsoft/agentmesh-api", "@microsoft/agent-os-cursor",
"@microsoft/agentmesh-mastra",
# Common deps
"typescript", "tsup", "vitest", "express", "zod", "@mastra/core",
"@modelcontextprotocol/sdk", "ws", "commander", "chalk",
"@anthropic-ai/sdk", "@types/node", "@types/ws", "@types/express",
# Common npm dev dependencies
"eslint", "@typescript-eslint/parser", "@typescript-eslint/eslint-plugin",
"ts-jest", "@types/jest", "jest", "rimraf", "prettier",
"axios", "@types/vscode", "@vscode/vsce", "webpack", "webpack-cli",
"ts-node", "nodemon", "concurrently", "dotenv",
"esbuild", "@esbuild/linux-x64", "@esbuild/darwin-arm64",
}
# Known Cargo crate names
REGISTERED_CARGO_PACKAGES = {
"serde", "serde_json", "serde_yaml", "sha2", "ed25519-dalek",
"rand", "thiserror", "tempfile", "agentmesh",
}
# Patterns that are always safe (not package names)
SAFE_PATTERNS = {
"-e", "--editable", "-r", "--requirement", "--upgrade", "--no-cache-dir",
"--quiet", "--require-hashes", "--hash", ".", "..", "../..",
"pip", "install", "%pip",
}
PIP_INSTALL_RE = re.compile(
r'(?:%?pip)\s+install\s+(.+?)(?:\s*\\?\s*$|(?=\s*&&|\s*\||\s*;|\s*#))',
re.MULTILINE,
)
def extract_package_names(install_args: str) -> list[str]:
"""Extract package names from a pip install argument string."""
packages = []
for token in install_args.split():
# Skip flags
if token.startswith("-") or token in SAFE_PATTERNS:
continue
if token.startswith((".", "/", "\\", "http", "git+")):
continue
# Skip tokens that look like code, not package names
if any(c in token for c in ('(', ')', '=', '"', "'", ":")):
continue
# Strip extras: package[extra] -> package
base = re.sub(r'\[.*\]', '', token)
# Strip version specifiers: package>=1.0 -> package
base = re.split(r'[><=!~]', base)[0]
# Strip markdown/quote artifacts
base = base.strip('`"\'(){}%')
if base and base not in SAFE_PATTERNS:
packages.append(base)
return packages
def check_file(filepath: str) -> list[str]:
"""Check a file for potentially unregistered pip install targets."""
findings = []
try:
with open(filepath, encoding="utf-8", errors="ignore") as f:
content = f.read()
except (OSError, UnicodeDecodeError):
return findings
for match in PIP_INSTALL_RE.finditer(content):
line_num = content[:match.start()].count("\n") + 1
packages = extract_package_names(match.group(1))
for pkg in packages:
if pkg.lower() not in {p.lower() for p in REGISTERED_PACKAGES}:
findings.append(
f" {filepath}:{line_num}: "
f"'{pkg}' may not be registered on PyPI"
)
return findings
def check_requirements_file(filepath: str) -> list[str]:
"""Check a requirements*.txt file for unregistered package names."""
findings = []
try:
with open(filepath, encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
except (OSError, UnicodeDecodeError):
return findings
registered_lower = {p.lower() for p in REGISTERED_PACKAGES}
for line_num, line in enumerate(lines, 1):
line = line.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
if line.startswith((".", "/", "\\", "http", "git+")):
continue
# Strip extras and version specifiers
base = re.sub(r'\[.*\]', '', line)
base = re.split(r'[><=!~;@\s]', base)[0].strip()
if base and base.lower() not in registered_lower:
findings.append(
f" {filepath}:{line_num}: "
f"'{base}' may not be registered on PyPI"
)
return findings
def check_notebook(filepath: str) -> list[str]:
"""Check a Jupyter notebook for pip install of unregistered packages."""
findings = []
try:
with open(filepath, encoding="utf-8", errors="ignore") as f:
nb = json.load(f)
except (OSError, json.JSONDecodeError, UnicodeDecodeError):
return findings
registered_lower = {p.lower() for p in REGISTERED_PACKAGES}
for cell in nb.get("cells", []):
for line in cell.get("source", []):
if "pip install" in line and not line.strip().startswith("#"):
packages = extract_package_names(line)
for pkg in packages:
if pkg.lower() not in registered_lower:
findings.append(
f" {filepath}: "
f"'{pkg}' may not be registered on PyPI"
)
return findings
def check_pyproject_toml(filepath: str) -> list[str]:
"""Check a pyproject.toml for unregistered package dependencies."""
findings = []
try:
with open(filepath, encoding="utf-8", errors="ignore") as f:
content = f.read()
except (OSError, UnicodeDecodeError):
return findings
registered_lower = {p.lower() for p in REGISTERED_PACKAGES}
# Match dependency lines like: "package>=1.0" or "package[extra]>=1.0,<2.0"
dep_re = re.compile(r'^[\s"]*([a-zA-Z0-9_-]+)', re.MULTILINE)
in_deps = False
for line_num, line in enumerate(content.splitlines(), 1):
stripped = line.strip()
if stripped.startswith("[project.dependencies]") or \
stripped.startswith("[project.optional-dependencies"):
in_deps = True
continue
if stripped.startswith("[") and in_deps:
in_deps = False
continue
if not in_deps:
continue
if not stripped or stripped.startswith("#"):
continue
m = dep_re.match(stripped.strip('"').strip("'").strip(","))
if m:
pkg = m.group(1)
if pkg.lower() not in registered_lower and pkg.lower() not in {
"python", "requires-python",
}:
severity = "HIGH RISK" if pkg.lower() in {
p.lower() for p in LOCAL_ONLY_PACKAGES
} else ""
msg = f" {filepath}:{line_num}: '{pkg}' may not be registered on PyPI"
if severity:
msg += f" [{severity}: local-only package]"
findings.append(msg)
return findings
def check_package_json(filepath: str) -> list[str]:
"""Check a package.json for unregistered npm package dependencies."""
findings = []
try:
with open(filepath, encoding="utf-8", errors="ignore") as f:
data = json.load(f)
except (OSError, json.JSONDecodeError, UnicodeDecodeError):
return findings
registered_lower = {p.lower() for p in REGISTERED_NPM_PACKAGES}
for section in ("dependencies", "devDependencies", "peerDependencies"):
for pkg in data.get(section, {}):
if pkg.lower() not in registered_lower:
findings.append(
f" {filepath}: npm '{pkg}' ({section}) may not be registered"
)
return findings
def check_cargo_toml(filepath: str) -> list[str]:
"""Check a Cargo.toml for unregistered crate dependencies."""
findings = []
try:
with open(filepath, encoding="utf-8", errors="ignore") as f:
content = f.read()
except (OSError, UnicodeDecodeError):
return findings
registered_lower = {p.lower() for p in REGISTERED_CARGO_PACKAGES}
in_deps = False
for line_num, line in enumerate(content.splitlines(), 1):
stripped = line.strip()
if stripped in ("[dependencies]", "[dev-dependencies]",
"[build-dependencies]"):
in_deps = True
continue
if stripped.startswith("[") and in_deps:
in_deps = False
continue
if not in_deps or not stripped or stripped.startswith("#"):
continue
m = re.match(r'^([a-zA-Z0-9_-]+)\s*=', stripped)
if m:
crate = m.group(1)
if crate.lower() not in registered_lower:
findings.append(
f" {filepath}:{line_num}: crate '{crate}' "
f"may not be registered on crates.io"
)
return findings
def main() -> int:
parser = argparse.ArgumentParser(
description="Detect unregistered PyPI package names in pip install commands.",
)
parser.add_argument(
"--strict", action="store_true",
help="Also scan notebooks and requirements*.txt files; exit 1 on any violation",
)
parser.add_argument("files", nargs="*", help="Files to check")
args = parser.parse_args()
# Get files to check
if args.files:
files = args.files
else:
# Pre-commit mode: check staged files
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True, text=True,
)
files = [
f for f in result.stdout.strip().split("\n")
if f.endswith((".md", ".py", ".ts", ".txt", ".yaml", ".yml", ".ipynb", ".svg"))
]
all_findings = []
for f in files:
all_findings.extend(check_file(f))
# --strict: additionally scan all notebooks, requirements, and manifest files
if args.strict:
for nb in glob.glob("**/*.ipynb", recursive=True):
if "node_modules" in nb or ".ipynb_checkpoints" in nb:
continue
all_findings.extend(check_notebook(nb))
for req in glob.glob("**/requirements*.txt", recursive=True):
if "node_modules" in req:
continue
all_findings.extend(check_requirements_file(req))
for pyproj in glob.glob("**/pyproject.toml", recursive=True):
if "node_modules" in pyproj:
continue
all_findings.extend(check_pyproject_toml(pyproj))
for pkgjson in glob.glob("**/package.json", recursive=True):
if "node_modules" in pkgjson:
continue
all_findings.extend(check_package_json(pkgjson))
for cargo in glob.glob("**/Cargo.toml", recursive=True):
if "node_modules" in cargo:
continue
all_findings.extend(check_cargo_toml(cargo))
if all_findings:
print("⚠️ Potential dependency confusion detected:")
print()
for finding in all_findings:
print(finding)
print()
print("If the package IS registered on PyPI, add it to REGISTERED_PACKAGES")
print("in scripts/check_dependency_confusion.py")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())