Skip to content

Commit 09e3701

Browse files
authored
feat(cloud): upload compressed sqlite bundles (#78)
1 parent 2e33607 commit 09e3701

5 files changed

Lines changed: 113 additions & 72 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
- Mirror the non-DM local SQLite archive into the Worker-backed R2 object store
1919
during `discrawl cloud publish`, alongside the D1 row ingest used for live
2020
queries.
21+
- Compress the sanitized SQLite mirror as a gzip chunk bundle with an explicit
22+
privacy/count manifest before uploading to R2.
2123

2224
### Fixes
2325

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ require (
4444
github.com/muesli/cancelreader v0.2.2 // indirect
4545
github.com/muesli/termenv v0.16.0 // indirect
4646
github.com/ncruces/go-strftime v1.0.0 // indirect
47-
github.com/openclaw/crawlkit v0.10.0
47+
github.com/openclaw/crawlkit v0.11.0
4848
github.com/pmezard/go-difflib v1.0.0 // indirect
4949
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
5050
github.com/rivo/uniseg v0.4.7 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc
7171
github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk=
7272
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
7373
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
74-
github.com/openclaw/crawlkit v0.10.0 h1:FKe5Aus+lgik76KswimTvnlwMdfiuaZV1MGign6z73k=
75-
github.com/openclaw/crawlkit v0.10.0/go.mod h1:2XkSx3N8yzyjc+Jyf1Zl9VEQVlElh/dzBMW1cRMQQGw=
74+
github.com/openclaw/crawlkit v0.11.0 h1:aKK8XVunwT4WYUYp7IxcgCXdg8L2WeYIX8da5HGuNyg=
75+
github.com/openclaw/crawlkit v0.11.0/go.mod h1:2XkSx3N8yzyjc+Jyf1Zl9VEQVlElh/dzBMW1cRMQQGw=
7676
github.com/pelletier/go-toml/v2 v2.3.1 h1:MYEvvGnQjeNkRF1qUuGolNtNExTDwct51yp7olPtrEc=
7777
github.com/pelletier/go-toml/v2 v2.3.1/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
7878
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=

internal/cli/cli_test.go

Lines changed: 86 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cli
22

33
import (
44
"bytes"
5+
"compress/gzip"
56
"context"
67
"database/sql"
78
"encoding/json"
@@ -1305,57 +1306,98 @@ func TestCloudPublishSendsNonDMRows(t *testing.T) {
13051306
tokenEnv := "DISCRAWL_TEST_PUBLISH_TOKEN"
13061307
t.Setenv(tokenEnv, "publish-token")
13071308
seenTables := map[string]crawlremote.IngestRequest{}
1308-
var sawSQLiteUpload bool
1309+
var sawSQLitePart bool
1310+
var sawSQLiteManifest bool
1311+
var sqliteBundlePart []byte
13091312
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
13101313
assert.Equal(t, "Bearer publish-token", req.Header.Get("Authorization"))
13111314
if req.Method == http.MethodPut && req.URL.EscapedPath() == "/v1/apps/discrawl/archives/discrawl%2Fopenclaw/sqlite" {
1312-
sawSQLiteUpload = true
1315+
uploadKind := req.Header.Get("X-Crawl-Sqlite-Upload")
13131316
payload, err := io.ReadAll(req.Body)
13141317
if err != nil {
13151318
t.Errorf("read sqlite upload: %v", err)
13161319
http.Error(w, err.Error(), http.StatusBadRequest)
13171320
return
13181321
}
1319-
if !bytes.HasPrefix(payload, []byte("SQLite format 3")) {
1320-
t.Errorf("sqlite upload should contain a sqlite image")
1321-
http.Error(w, "not sqlite", http.StatusBadRequest)
1322-
return
1323-
}
1324-
uploadPath := filepath.Join(dir, "uploaded-cloud.db")
1325-
if err := os.WriteFile(uploadPath, payload, 0o600); err != nil {
1326-
t.Errorf("write sqlite upload: %v", err)
1327-
http.Error(w, err.Error(), http.StatusInternalServerError)
1328-
return
1329-
}
1330-
uploadDB, err := sql.Open("sqlite", uploadPath)
1331-
if err != nil {
1332-
t.Errorf("open sqlite upload: %v", err)
1333-
http.Error(w, err.Error(), http.StatusInternalServerError)
1334-
return
1335-
}
1336-
defer func() { _ = uploadDB.Close() }()
1337-
var dmMessages int
1338-
if err := uploadDB.QueryRowContext(ctx, "select count(*) from messages where guild_id = ?", store.DirectMessageGuildID).Scan(&dmMessages); err != nil {
1339-
t.Errorf("count dm messages: %v", err)
1340-
http.Error(w, err.Error(), http.StatusInternalServerError)
1341-
return
1342-
}
1343-
assert.Zero(t, dmMessages)
1344-
var tableCount int
1345-
if err := uploadDB.QueryRowContext(ctx, "select count(*) from sqlite_master where type = 'table' and name in ('guilds', 'channels', 'members', 'messages')").Scan(&tableCount); err != nil {
1346-
t.Errorf("count cloud tables: %v", err)
1347-
http.Error(w, err.Error(), http.StatusInternalServerError)
1348-
return
1349-
}
1350-
assert.Equal(t, 4, tableCount)
1351-
assert.NotEmpty(t, req.Header.Get("X-Crawl-Content-Sha256"))
13521322
w.Header().Set("Content-Type", "application/json")
1353-
_ = json.NewEncoder(w).Encode(crawlremote.SQLiteUploadResult{
1354-
App: "discrawl",
1355-
Archive: "discrawl/openclaw",
1356-
Complete: true,
1357-
Object: &crawlremote.SQLiteObject{Key: "v1/discrawl/discrawl%2Fopenclaw/sqlite/current.db", Size: int64(len(payload))},
1358-
})
1323+
switch uploadKind {
1324+
case "bundle-part":
1325+
sawSQLitePart = true
1326+
assert.Equal(t, "application/gzip", req.Header.Get("Content-Type"))
1327+
assert.NotEmpty(t, req.Header.Get("X-Crawl-Content-Sha256"))
1328+
assert.True(t, bytes.HasPrefix(payload, []byte{0x1f, 0x8b}), "sqlite bundle part should be gzip")
1329+
sqliteBundlePart = append(sqliteBundlePart[:0], payload...)
1330+
_ = json.NewEncoder(w).Encode(crawlremote.SQLiteUploadResult{
1331+
App: "discrawl",
1332+
Archive: "discrawl/openclaw",
1333+
Complete: false,
1334+
Object: &crawlremote.SQLiteObject{Key: "v1/discrawl/discrawl%2Fopenclaw/sqlite/chunks/current.db.gz.part-0000", Size: int64(len(payload))},
1335+
})
1336+
case "bundle-manifest":
1337+
sawSQLiteManifest = true
1338+
var manifest crawlremote.SQLiteBundleManifest
1339+
if err := json.Unmarshal(payload, &manifest); err != nil {
1340+
t.Errorf("decode sqlite bundle manifest: %v", err)
1341+
http.Error(w, err.Error(), http.StatusBadRequest)
1342+
return
1343+
}
1344+
assert.Equal(t, crawlremote.SQLiteGzipChunkedBundleFormat, manifest.Format)
1345+
assert.Equal(t, crawlremote.SQLiteGzipCompression, manifest.Compression.Algorithm)
1346+
assert.Equal(t, int64(1), manifest.Counts["messages"])
1347+
assert.Equal(t, false, manifest.Privacy["includes_private_messages"])
1348+
assert.Equal(t, false, manifest.Privacy["includes_raw_json"])
1349+
assert.Equal(t, "@me", manifest.Privacy["excludes_guild_id"])
1350+
reader, err := gzip.NewReader(bytes.NewReader(sqliteBundlePart))
1351+
if err != nil {
1352+
t.Errorf("open sqlite bundle gzip: %v", err)
1353+
http.Error(w, err.Error(), http.StatusInternalServerError)
1354+
return
1355+
}
1356+
decompressed, err := io.ReadAll(reader)
1357+
if closeErr := reader.Close(); err == nil {
1358+
err = closeErr
1359+
}
1360+
if err != nil {
1361+
t.Errorf("read sqlite bundle gzip: %v", err)
1362+
http.Error(w, err.Error(), http.StatusInternalServerError)
1363+
return
1364+
}
1365+
uploadPath := filepath.Join(dir, "uploaded-cloud.db")
1366+
if err := os.WriteFile(uploadPath, decompressed, 0o600); err != nil {
1367+
t.Errorf("write sqlite upload: %v", err)
1368+
http.Error(w, err.Error(), http.StatusInternalServerError)
1369+
return
1370+
}
1371+
uploadDB, err := sql.Open("sqlite", uploadPath)
1372+
if err != nil {
1373+
t.Errorf("open sqlite upload: %v", err)
1374+
http.Error(w, err.Error(), http.StatusInternalServerError)
1375+
return
1376+
}
1377+
defer func() { _ = uploadDB.Close() }()
1378+
var dmMessages int
1379+
if err := uploadDB.QueryRowContext(ctx, "select count(*) from messages where guild_id = ?", store.DirectMessageGuildID).Scan(&dmMessages); err != nil {
1380+
t.Errorf("count dm messages: %v", err)
1381+
http.Error(w, err.Error(), http.StatusInternalServerError)
1382+
return
1383+
}
1384+
assert.Zero(t, dmMessages)
1385+
var tableCount int
1386+
if err := uploadDB.QueryRowContext(ctx, "select count(*) from sqlite_master where type = 'table' and name in ('guilds', 'channels', 'members', 'messages')").Scan(&tableCount); err != nil {
1387+
t.Errorf("count cloud tables: %v", err)
1388+
http.Error(w, err.Error(), http.StatusInternalServerError)
1389+
return
1390+
}
1391+
assert.Equal(t, 4, tableCount)
1392+
_ = json.NewEncoder(w).Encode(crawlremote.SQLiteBundleUploadResult{
1393+
App: "discrawl",
1394+
Archive: "discrawl/openclaw",
1395+
Complete: true,
1396+
Bundle: &crawlremote.SQLiteBundle{Key: "v1/discrawl/discrawl%2Fopenclaw/sqlite/current.manifest.json", Manifest: &manifest},
1397+
})
1398+
default:
1399+
http.Error(w, "missing sqlite bundle upload kind", http.StatusBadRequest)
1400+
}
13591401
return
13601402
}
13611403
assert.Equal(t, http.MethodPost, req.Method)
@@ -1395,13 +1437,14 @@ func TestCloudPublishSendsNonDMRows(t *testing.T) {
13951437
require.Len(t, seenTables["channels"].Rows, 1)
13961438
require.Len(t, seenTables["messages"].Rows, 1)
13971439
require.True(t, seenTables["messages"].Final)
1398-
require.True(t, sawSQLiteUpload)
1440+
require.True(t, sawSQLitePart)
1441+
require.True(t, sawSQLiteManifest)
13991442

14001443
var payload map[string]any
14011444
require.NoError(t, json.Unmarshal(out.Bytes(), &payload))
14021445
require.InDelta(t, float64(1), payload["guilds"], 0)
14031446
require.InDelta(t, float64(1), payload["messages"], 0)
1404-
require.NotNil(t, payload["sqlite_object"])
1447+
require.NotNil(t, payload["sqlite_bundle"])
14051448
}
14061449

14071450
func TestCloudSQLiteExportHelpers(t *testing.T) {

internal/cli/cloud_commands.go

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,12 @@ func (r *runtime) runCloudPublish(args []string) error {
116116
if err != nil {
117117
return err
118118
}
119-
sqliteObject, err := uploadSQLiteArchive(r.ctx, client, "discrawl", archiveID, r.store.DB(), r.cfg.DBPath, manifest)
119+
sqliteBundle, err := uploadSQLiteArchive(r.ctx, client, "discrawl", archiveID, r.store.DB(), r.cfg.DBPath, manifest, map[string]int64{
120+
"guilds": guildCount,
121+
"channels": channelCount,
122+
"members": memberCount,
123+
"messages": messageCount,
124+
})
120125
if err != nil {
121126
return err
122127
}
@@ -127,7 +132,7 @@ func (r *runtime) runCloudPublish(args []string) error {
127132
"channels": channelCount,
128133
"members": memberCount,
129134
"messages": messageCount,
130-
"sqlite_object": sqliteObject,
135+
"sqlite_bundle": sqliteBundle,
131136
})
132137
})
133138
}
@@ -199,41 +204,32 @@ func cursorFor(start int) string {
199204
return strconv.Itoa(start)
200205
}
201206

202-
func uploadSQLiteArchive(ctx context.Context, client *crawlremote.Client, app, archive string, db *sql.DB, dbPath string, manifest crawlremote.IngestManifest) (*crawlremote.SQLiteObject, error) {
207+
func uploadSQLiteArchive(ctx context.Context, client *crawlremote.Client, app, archive string, db *sql.DB, dbPath string, manifest crawlremote.IngestManifest, counts map[string]int64) (*crawlremote.SQLiteBundle, error) {
203208
snapshotPath, cleanup, err := sqliteSnapshotPath(ctx, db)
204209
if err != nil {
205210
return nil, err
206211
}
207212
defer cleanup()
208-
file, err := os.Open(snapshotPath)
209-
if err != nil {
210-
return nil, fmt.Errorf("open sqlite snapshot: %w", err)
211-
}
212-
defer func() { _ = file.Close() }()
213-
info, err := file.Stat()
214-
if err != nil {
215-
return nil, fmt.Errorf("stat sqlite snapshot: %w", err)
216-
}
217-
sum, err := cloudFileSHA256(snapshotPath)
213+
bundle, err := crawlremote.BuildGzipSQLiteBundle(ctx, crawlremote.SQLiteBundleBuildOptions{
214+
App: app,
215+
Archive: archive,
216+
SourcePath: snapshotPath,
217+
Counts: counts,
218+
Privacy: map[string]any{
219+
"excludes_guild_id": "@me",
220+
"includes_private_messages": false,
221+
"includes_raw_json": false,
222+
},
223+
})
218224
if err != nil {
219225
return nil, err
220226
}
221-
if _, err := file.Seek(0, io.SeekStart); err != nil {
222-
return nil, fmt.Errorf("rewind sqlite snapshot: %w", err)
223-
}
224-
result, err := client.UploadSQLite(ctx, app, archive, crawlremote.SQLiteUploadRequest{
225-
Body: file,
226-
Size: info.Size(),
227-
ContentSHA256: sum,
228-
SchemaName: manifest.SchemaName,
229-
SchemaVersion: manifest.SchemaVersion,
230-
SchemaHash: manifest.SchemaHash,
231-
SourceSyncAt: manifest.SourceSyncAt,
232-
})
227+
defer bundle.Cleanup()
228+
result, err := client.UploadSQLiteBundleFiles(ctx, app, archive, bundle.Manifest, bundle.Parts)
233229
if err != nil {
234230
return nil, err
235231
}
236-
return result.Object, nil
232+
return result.Bundle, nil
237233
}
238234

239235
func sqliteSnapshotPath(ctx context.Context, db *sql.DB) (string, func(), error) {

0 commit comments

Comments
 (0)