-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathskills.py
More file actions
335 lines (266 loc) · 10.9 KB
/
skills.py
File metadata and controls
335 lines (266 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#!/usr/bin/env python3
"""Manage skills: sync shared assets, generate manifest, validate."""
import argparse
import json
import re
import shutil
import sys
from datetime import datetime, timezone
from pathlib import Path
SHARED_ASSETS = [
"assets/databricks.svg",
"assets/databricks.png",
]
SKILL_METADATA = {
"databricks-core": {
"description": "Core Databricks skill for CLI, auth, and data exploration",
"experimental": False,
},
"databricks-apps": {
"description": "Databricks Apps development and deployment",
"experimental": False,
},
"databricks-jobs": {
"description": "Databricks Jobs orchestration and scheduling",
"experimental": False,
},
"databricks-lakebase": {
"description": "Databricks Lakebase Postgres: projects, scaling, connectivity, synced tables, and Data API",
"experimental": False,
},
"databricks-dabs": {
"description": "Declarative Automation Bundles (DABs) for deploying and managing Databricks resources",
"experimental": False,
},
"databricks-model-serving": {
"description": "Databricks Model Serving endpoint management",
"experimental": False,
},
"databricks-pipelines": {
"description": "Databricks Pipelines (DLT) for ETL and streaming",
"experimental": False,
},
"databricks-proto-first": {
"description": "Proto-first schema design for Databricks apps",
"experimental": False,
},
"databricks-serverless-migration": {
"description": "Migrate Databricks workloads from classic compute to serverless compute, including compatibility checks and concrete fixes",
"experimental": False,
},
}
def iter_skill_dirs(repo_root: Path):
"""Yield skill directories that contain SKILL.md."""
skills_dir = repo_root / "skills"
for item in sorted(skills_dir.iterdir()):
if not item.is_dir():
continue
if item.name.startswith(".") or item.name == "scripts":
continue
if not (item / "SKILL.md").exists():
continue
yield item
def extract_version_from_skill(skill_path: Path) -> str:
"""Extract version from SKILL.md frontmatter metadata."""
skill_md = skill_path / "SKILL.md"
if not skill_md.exists():
raise ValueError(f"SKILL.md not found in {skill_path}")
content = skill_md.read_text()
if not content.startswith("---"):
raise ValueError(f"SKILL.md in {skill_path} missing frontmatter")
end_idx = content.find("---", 3)
if end_idx == -1:
raise ValueError(f"SKILL.md in {skill_path} has unclosed frontmatter")
frontmatter = content[3:end_idx]
version_match = re.search(r'version:\s*["\']?([^"\'\n]+)["\']?', frontmatter)
if version_match:
return version_match.group(1).strip()
return "0.0.0"
def get_skill_updated_at(skill_path: Path) -> str:
"""Get the most recent modification time of any file in the skill directory."""
latest_mtime = 0.0
for file_path in skill_path.rglob("*"):
if file_path.is_file():
mtime = file_path.stat().st_mtime
if mtime > latest_mtime:
latest_mtime = mtime
if latest_mtime == 0.0:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
return datetime.fromtimestamp(latest_mtime, timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
# ---------------------------------------------------------------------------
# Sync
# ---------------------------------------------------------------------------
def sync_assets(repo_root: Path) -> int:
"""Copy shared assets from repo root into each skill directory.
Only writes when content differs. Uses shutil.copy2 to preserve mtime
from the source so that skill updated_at timestamps stay stable.
Returns count of files written.
"""
for asset_rel in SHARED_ASSETS:
source = repo_root / asset_rel
if not source.exists():
raise ValueError(f"Missing shared asset '{asset_rel}' at repo root.")
synced = 0
for skill_dir in iter_skill_dirs(repo_root):
for asset_rel in SHARED_ASSETS:
source = repo_root / asset_rel
dest = skill_dir / asset_rel
dest.parent.mkdir(parents=True, exist_ok=True)
if dest.exists() and dest.read_bytes() == source.read_bytes():
continue
shutil.copy2(source, dest)
synced += 1
return synced
def check_assets_synced(repo_root: Path) -> list[str]:
"""Validate that all shared assets are present and up-to-date.
Returns a list of error messages (empty means all good).
"""
errors: list[str] = []
for asset_rel in SHARED_ASSETS:
source = repo_root / asset_rel
if not source.exists():
errors.append(f"Missing shared asset '{asset_rel}' at repo root.")
continue
source_bytes = source.read_bytes()
for skill_dir in iter_skill_dirs(repo_root):
dest = skill_dir / asset_rel
if not dest.exists():
errors.append(f"Missing '{asset_rel}' in skill '{skill_dir.name}'.")
elif dest.read_bytes() != source_bytes:
errors.append(f"Stale '{asset_rel}' in skill '{skill_dir.name}'.")
return errors
# ---------------------------------------------------------------------------
# Manifest generation
# ---------------------------------------------------------------------------
def generate_manifest(repo_root: Path) -> dict:
"""Generate manifest from skill directories."""
manifest_path = repo_root / "manifest.json"
existing_skills = {}
if manifest_path.exists():
existing_skills = json.loads(manifest_path.read_text()).get("skills", {})
skills = {}
for skill_dir in iter_skill_dirs(repo_root):
files = sorted(
str(f.relative_to(skill_dir))
for f in skill_dir.rglob("*")
if f.is_file()
)
if skill_dir.name not in SKILL_METADATA:
raise ValueError(
f"Missing SKILL_METADATA entry for skill '{skill_dir.name}'. "
"Add it to SKILL_METADATA dict."
)
openai_yaml = skill_dir / "agents" / "openai.yaml"
if not openai_yaml.exists():
raise ValueError(
f"Missing agents/openai.yaml in skill '{skill_dir.name}'. "
"Each skill must include Codex marketplace metadata."
)
metadata = SKILL_METADATA[skill_dir.name]
skill_entry = {
"version": extract_version_from_skill(skill_dir),
"description": metadata.get("description", ""),
"experimental": metadata.get("experimental", False),
"updated_at": get_skill_updated_at(skill_dir),
"files": files,
}
if metadata.get("min_cli_version"):
skill_entry["min_cli_version"] = metadata["min_cli_version"]
existing = existing_skills.get(skill_dir.name, {})
if "base_revision" in existing:
skill_entry["base_revision"] = existing["base_revision"]
skills[skill_dir.name] = skill_entry
return {
"version": "2",
"updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"skills": skills,
}
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
def normalize_manifest(manifest: dict) -> dict:
"""Normalize manifest for comparison by excluding volatile fields."""
normalized = manifest.copy()
normalized.pop("updated_at", None)
skills = {}
for name, skill in manifest.get("skills", {}).items():
skill_copy = skill.copy()
skill_copy.pop("updated_at", None)
skill_copy.pop("base_revision", None)
skills[name] = skill_copy
normalized["skills"] = skills
return normalized
def validate_manifest(repo_root: Path) -> bool:
"""Validate that manifest.json is up to date. Returns True if valid."""
manifest_path = repo_root / "manifest.json"
if not manifest_path.exists():
print("ERROR: manifest.json does not exist", file=sys.stderr)
return False
current_manifest = json.loads(manifest_path.read_text())
expected_manifest = generate_manifest(repo_root)
current_normalized = normalize_manifest(current_manifest)
expected_normalized = normalize_manifest(expected_manifest)
if current_normalized != expected_normalized:
print("ERROR: manifest.json is out of date", file=sys.stderr)
print("\nExpected:", file=sys.stderr)
print(json.dumps(expected_normalized, indent=2), file=sys.stderr)
print("\nActual:", file=sys.stderr)
print(json.dumps(current_normalized, indent=2), file=sys.stderr)
return False
return True
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Manage skills: sync shared assets, generate manifest, validate."
)
parser.add_argument(
"mode",
nargs="?",
default="generate",
choices=["sync", "generate", "validate"],
help=(
"sync: copy shared assets into each skill directory. "
"generate: sync + create manifest.json (default). "
"validate: check assets and manifest are up to date."
),
)
args = parser.parse_args()
repo_root = Path(__file__).parent.parent
match args.mode:
case "sync":
synced = sync_assets(repo_root)
print(f"Synced {synced} asset(s)")
case "generate":
synced = sync_assets(repo_root)
print(f"Synced {synced} asset(s)")
manifest = generate_manifest(repo_root)
manifest_path = repo_root / "manifest.json"
manifest_path.write_text(json.dumps(manifest, indent=2) + "\n")
print(f"Generated {manifest_path}")
print(
f"Found {len(manifest['skills'])} skill(s): "
f"{', '.join(manifest['skills'].keys())}"
)
case "validate":
ok = True
asset_errors = check_assets_synced(repo_root)
if asset_errors:
print("ERROR: Shared assets are out of sync:", file=sys.stderr)
for err in asset_errors:
print(f" - {err}", file=sys.stderr)
ok = False
if not validate_manifest(repo_root):
ok = False
if not ok:
print(
"\nRun `python3 scripts/skills.py generate` to fix.",
file=sys.stderr,
)
sys.exit(1)
print("Everything is up to date.")
if __name__ == "__main__":
main()