Skip to content

Commit b28a485

Browse files
rsky-relay: optimize plc_directory
1 parent 0785964 commit b28a485

3 files changed

Lines changed: 110 additions & 33 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Rocket.toml
1414
**/data/
1515
db/
1616
*.db
17+
*.db-shm
18+
*.db-wal
1719
*.db-journal
1820
rsky-relay.log.*
1921

rsky-relay/crawler.py

Lines changed: 92 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,66 @@
1414
def create_database():
1515
"""Create SQLite database and table if they don't exist."""
1616
conn = sqlite3.connect(DB_FILE)
17+
conn.execute("""PRAGMA auto_vacuum = INCREMENTAL""")
18+
conn.execute("""PRAGMA journal_mode = WAL""")
1719
cursor = conn.cursor()
1820

21+
# Set PRAGMAs
22+
conn.execute("""PRAGMA cache_size = -64000""")
23+
conn.execute("""PRAGMA journal_size_limit = 6144000""")
24+
conn.execute("""PRAGMA mmap_size = 268435456""")
25+
conn.execute("""PRAGMA secure_delete = OFF""")
26+
conn.execute("""PRAGMA synchronous = NORMAL""")
27+
conn.execute("""PRAGMA temp_store = MEMORY""")
28+
1929
# Create table for PLC operations
2030
cursor.execute("""
2131
CREATE TABLE IF NOT EXISTS plc_operations (
22-
did TEXT,
23-
created_at TEXT,
24-
nullified BOOLEAN,
25-
cid TEXT,
26-
operation TEXT
32+
cid TEXT NOT NULL PRIMARY KEY ON CONFLICT REPLACE,
33+
did TEXT NOT NULL,
34+
created_at TEXT NOT NULL,
35+
nullified BOOLEAN NOT NULL,
36+
operation BLOB NOT NULL,
37+
pds_endpoint TEXT GENERATED ALWAYS AS (
38+
json_extract(operation, '$.services.atproto_pds.endpoint')
39+
) STORED,
40+
atproto_key TEXT GENERATED ALWAYS AS (
41+
json_extract(operation, '$.verificationMethods.atproto')
42+
) STORED,
43+
labeler_endpoint TEXT GENERATED ALWAYS AS (
44+
json_extract(operation, '$.services.atproto_labeler.endpoint')
45+
) STORED,
46+
atproto_label_key TEXT GENERATED ALWAYS AS (
47+
json_extract(operation, '$.verificationMethods.atproto_label')
48+
) STORED
2749
)
2850
""")
51+
52+
# Drop all views
53+
cursor.execute("""DROP VIEW IF EXISTS plc_labelers""")
54+
cursor.execute("""DROP VIEW IF EXISTS plc_pdses""")
55+
cursor.execute("""DROP VIEW IF EXISTS plc_keys""")
56+
cursor.execute("""DROP VIEW IF EXISTS plc_latest""")
57+
58+
# Create indexes
2959
cursor.execute("""
30-
CREATE INDEX IF NOT EXISTS did_index ON plc_operations (
31-
did
32-
)
60+
CREATE INDEX IF NOT EXISTS idx_plc_operations_did_created_at
61+
ON plc_operations (did, created_at DESC)
3362
""")
3463
cursor.execute("""
35-
CREATE INDEX IF NOT EXISTS created_at_index ON plc_operations (
36-
created_at ASC
37-
)
64+
CREATE INDEX IF NOT EXISTS idx_plc_operations_pds_endpoint
65+
ON plc_operations (pds_endpoint, created_at)
66+
WHERE pds_endpoint IS NOT NULL
3867
""")
3968
cursor.execute("""
40-
CREATE VIEW IF NOT EXISTS plc_latest AS
69+
CREATE INDEX IF NOT EXISTS idx_plc_operations_labeler_endpoint
70+
ON plc_operations (labeler_endpoint, created_at)
71+
WHERE labeler_endpoint IS NOT NULL
72+
""")
73+
74+
# Create views
75+
cursor.execute("""
76+
CREATE VIEW plc_latest AS
4177
SELECT *
4278
FROM plc_operations
4379
WHERE created_at = (
@@ -47,16 +83,45 @@ def create_database():
4783
)
4884
""")
4985
cursor.execute("""
50-
CREATE VIEW IF NOT EXISTS plc_keys AS
86+
CREATE VIEW plc_keys AS
87+
SELECT
88+
did,
89+
created_at,
90+
pds_endpoint,
91+
atproto_key AS pds_key,
92+
labeler_endpoint,
93+
atproto_label_key AS labeler_key
94+
FROM plc_latest
95+
""")
96+
cursor.execute("""
97+
CREATE VIEW plc_pdses AS
98+
SELECT
99+
MIN(created_at) AS first,
100+
MAX(created_at) AS last,
101+
count() AS accounts,
102+
pds_endpoint
103+
FROM plc_latest
104+
WHERE pds_endpoint IS NOT NULL
105+
GROUP BY pds_endpoint
106+
ORDER BY last
107+
""")
108+
cursor.execute("""
109+
CREATE VIEW plc_labelers AS
51110
SELECT
52-
did,
53-
created_at,
54-
json_extract(operation, '$.services.atproto_pds.endpoint') AS endpoint,
55-
json_extract(operation, '$.verificationMethods.atproto') AS key
111+
did,
112+
created_at,
113+
labeler_endpoint
56114
FROM plc_latest
115+
WHERE labeler_endpoint IS NOT NULL
116+
ORDER BY created_at
57117
""")
58118

59119
conn.commit()
120+
121+
# Vacuum & optimize
122+
cursor.execute("""PRAGMA incremental_vacuum""")
123+
cursor.execute("""PRAGMA optimize = 0x10002""")
124+
60125
return conn
61126

62127

@@ -83,15 +148,15 @@ def insert_operations(conn, operations):
83148
for op in operations:
84149
cursor.execute(
85150
"""
86-
INSERT INTO plc_operations (did, cid, nullified, created_at, operation)
151+
INSERT INTO plc_operations (cid, did, created_at, nullified, operation)
87152
VALUES (?, ?, ?, ?, ?)
88153
""",
89154
(
90-
op.get("did"),
91155
op.get("cid"),
92-
op.get("nullified"),
156+
op.get("did"),
93157
op.get("createdAt"),
94-
json.dumps(op.get("operation")),
158+
op.get("nullified"),
159+
json.dumps(op.get("operation"), separators=(",", ":")).encode("utf-8"),
95160
),
96161
)
97162

@@ -131,7 +196,7 @@ def main():
131196
after = latest_timestamp
132197

133198
total_processed = get_count(conn) or 0
134-
request_count = 0
199+
request_count = total_processed // 999
135200

136201
try:
137202
print("Starting PLC Directory API crawl...")
@@ -145,7 +210,8 @@ def main():
145210
break
146211

147212
insert_operations(conn, operations)
148-
total_processed += len(operations)
213+
prev_processed = total_processed
214+
total_processed = get_count(conn) or 0
149215

150216
# Get the last timestamp for the next request
151217
last_op = operations[-1]
@@ -156,6 +222,9 @@ def main():
156222
f"Request #{request_count}: Fetched {len(operations)}, "
157223
f"Total {total_processed}, Last timestamp: {after}"
158224
)
225+
ignored = len(operations) - (total_processed - prev_processed)
226+
if ignored != 1:
227+
print(f"IGNORED: {ignored}")
159228

160229
# Check if we got fewer records than requested (end of data)
161230
if len(operations) < COUNT_PER_REQUEST:

rsky-relay/src/validator/resolver.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ impl Resolver {
7676
"plc_directory.db",
7777
flag | OpenFlags::SQLITE_OPEN_NO_MUTEX,
7878
)?;
79+
if *DO_PLC_EXPORT {
80+
match conn.execute("PRAGMA secure_delete = OFF", []) {
81+
Ok(_) | Err(rusqlite::Error::ExecuteReturnedResults) => {}
82+
Err(err) => Err(err)?,
83+
};
84+
conn.execute("PRAGMA synchronous = NORMAL", [])?;
85+
conn.execute("PRAGMA incremental_vacuum", [])?;
86+
conn.execute("PRAGMA optimize = 0x10002", [])?;
87+
}
7988
let now = Instant::now();
8089
let last = now.checked_sub(EXPORT_INTERVAL).unwrap_or(now);
8190
let after = conn.query_one(
@@ -118,12 +127,10 @@ impl Resolver {
118127
}
119128

120129
pub fn query_db(&mut self, did: &str) -> Result<bool, ResolverError> {
121-
let mut stmt = self.conn.prepare_cached(
122-
"SELECT * FROM plc_keys WHERE did = ?1 ORDER BY created_at DESC LIMIT 1",
123-
)?;
130+
let mut stmt = self.conn.prepare_cached("SELECT * FROM plc_keys WHERE did = ?1")?;
124131
match stmt.query_one([did], |row| {
125-
let endpoint = row.get_ref("endpoint")?.as_str_or_null()?;
126-
let key = row.get_ref("key")?.as_str_or_null()?;
132+
let endpoint = row.get_ref("pds_endpoint")?.as_str_or_null()?;
133+
let key = row.get_ref("pds_key")?.as_str_or_null()?;
127134
Ok(parse_key_endpoint(endpoint, key))
128135
}) {
129136
Ok(Some((pds, key))) => {
@@ -201,17 +208,16 @@ impl Resolver {
201208
let mut dids = Vec::new();
202209
let mut count = 0;
203210
let tx = self.conn.transaction()?;
204-
let mut stmt = tx.prepare_cached("INSERT INTO plc_operations (did, cid, nullified, created_at, operation) VALUES (?1, ?2, ?3, ?4, ?5)")?;
211+
let mut stmt = tx.prepare_cached("INSERT INTO plc_operations (cid, did, created_at, nullified, operation) VALUES (?1, ?2, ?3, ?4, ?5)")?;
205212
for line in bytes.reader().lines() {
206213
count += 1;
207214
if let Some(doc) = parse_plc_doc(&line.unwrap_or_default()) {
208-
// TODO: remove duplicates
209215
stmt.execute((
210-
&doc.did,
211216
&doc.cid,
212-
&doc.nullified,
217+
&doc.did,
213218
&doc.created_at,
214-
doc.operation.get(),
219+
&doc.nullified,
220+
doc.operation.get().as_bytes(),
215221
))?;
216222
self.after = Some(doc.created_at);
217223
if self.inflight.remove(&doc.did) {

0 commit comments

Comments
 (0)