-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathdb.py
More file actions
140 lines (117 loc) · 5.6 KB
/
Copy pathdb.py
File metadata and controls
140 lines (117 loc) · 5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
db.py - SQLite schema setup and helper functions
"""
import sqlite3
from config import DB_PATH
# ──────────────────────────────────────────────────
# Schema
# ──────────────────────────────────────────────────
SCHEMA = """
CREATE TABLE IF NOT EXISTS repos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
-- Identity
name TEXT NOT NULL,
full_name TEXT UNIQUE NOT NULL,
description TEXT,
url TEXT,
clone_url TEXT,
-- Timestamps (ISO-8601)
created_at TEXT,
pushed_at TEXT,
fetched_at TEXT,
-- Stats
size_kb INTEGER,
stars INTEGER DEFAULT 0,
forks INTEGER DEFAULT 0,
language TEXT,
topics TEXT, -- JSON array ["oscp","pentest",...]
-- Payload (Base64-encoded)
file_structure_b64 TEXT, -- hierarchical JSON tree
file_names_b64 TEXT, -- flat JSON array of all relative paths
readme_b64 TEXT, -- raw README content
-- Categorization
category TEXT, -- Generic|OSCP+|Writeups|Sensitive|Tools|None
category_confidence REAL,
category_reasoning TEXT,
-- Processing flags
cloned INTEGER DEFAULT 0,
metadata_extracted INTEGER DEFAULT 0,
categorized INTEGER DEFAULT 0,
clone_path TEXT,
clone_error TEXT
);
CREATE INDEX IF NOT EXISTS idx_category ON repos(category);
CREATE INDEX IF NOT EXISTS idx_stars ON repos(stars);
CREATE INDEX IF NOT EXISTS idx_size_kb ON repos(size_kb);
"""
def get_conn(db_path: str = DB_PATH) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db(db_path: str = DB_PATH) -> None:
with get_conn(db_path) as conn:
conn.executescript(SCHEMA)
print(f"[db] Database initialised -> {db_path}")
# ──────────────────────────────────────────────────
# Upsert helpers
# ──────────────────────────────────────────────────
def upsert_repo(conn: sqlite3.Connection, data: dict) -> None:
"""Insert or update a repo row (keyed on full_name)."""
cols = list(data.keys())
placeholders = ", ".join(["?"] * len(cols))
updates = ", ".join(f"{c}=excluded.{c}" for c in cols if c != "full_name")
sql = f"""
INSERT INTO repos ({", ".join(cols)})
VALUES ({placeholders})
ON CONFLICT(full_name) DO UPDATE SET {updates}
"""
conn.execute(sql, list(data.values()))
def mark_cloned(conn: sqlite3.Connection, full_name: str, clone_path: str) -> None:
conn.execute(
"UPDATE repos SET cloned=1, clone_path=? WHERE full_name=?",
(clone_path, full_name),
)
def mark_clone_error(conn: sqlite3.Connection, full_name: str, error: str) -> None:
conn.execute(
"UPDATE repos SET cloned=-1, clone_error=? WHERE full_name=?",
(error, full_name),
)
def update_metadata(conn: sqlite3.Connection, full_name: str, payload: dict) -> None:
payload["metadata_extracted"] = 1
payload["full_name"] = full_name
cols = list(payload.keys())
updates = ", ".join(f"{c}=?" for c in cols if c != "full_name")
sql = f"UPDATE repos SET {updates} WHERE full_name=?"
values = [payload[c] for c in cols if c != "full_name"] + [full_name]
conn.execute(sql, values)
def update_category(conn: sqlite3.Connection, full_name: str, category: str,
confidence: float, reasoning: str) -> None:
conn.execute(
"""UPDATE repos
SET category=?, category_confidence=?, category_reasoning=?, categorized=1
WHERE full_name=?""",
(category, confidence, reasoning, full_name),
)
# ──────────────────────────────────────────────────
# Query helpers
# ──────────────────────────────────────────────────
def get_repos(conn: sqlite3.Connection, where: str = "1=1") -> list:
return conn.execute(f"SELECT * FROM repos WHERE {where}").fetchall()
def get_stats(conn: sqlite3.Connection) -> dict:
row = conn.execute("""
SELECT
COUNT(*) AS total,
SUM(cloned=1) AS cloned,
SUM(metadata_extracted=1) AS extracted,
SUM(categorized=1) AS categorized,
SUM(category='Generic') AS generic,
SUM(category='OSCP+') AS oscp_plus,
SUM(category='Writeups') AS writeups,
SUM(category='Sensitive') AS sensitive,
SUM(category='Tools') AS tools,
SUM(category='None') AS none_cat
FROM repos
""").fetchone()
return dict(row)