Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
- Mirror the non-DM local SQLite archive into the Worker-backed R2 object store
during `discrawl cloud publish`, alongside the D1 row ingest used for live
queries.
- Compress the sanitized SQLite mirror as a gzip chunk bundle with an explicit
privacy/count manifest before uploading to R2.

### Fixes

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/termenv v0.16.0 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/openclaw/crawlkit v0.10.0
github.com/openclaw/crawlkit v0.11.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rivo/uniseg v0.4.7 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc
github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/openclaw/crawlkit v0.10.0 h1:FKe5Aus+lgik76KswimTvnlwMdfiuaZV1MGign6z73k=
github.com/openclaw/crawlkit v0.10.0/go.mod h1:2XkSx3N8yzyjc+Jyf1Zl9VEQVlElh/dzBMW1cRMQQGw=
github.com/openclaw/crawlkit v0.11.0 h1:aKK8XVunwT4WYUYp7IxcgCXdg8L2WeYIX8da5HGuNyg=
github.com/openclaw/crawlkit v0.11.0/go.mod h1:2XkSx3N8yzyjc+Jyf1Zl9VEQVlElh/dzBMW1cRMQQGw=
github.com/pelletier/go-toml/v2 v2.3.1 h1:MYEvvGnQjeNkRF1qUuGolNtNExTDwct51yp7olPtrEc=
github.com/pelletier/go-toml/v2 v2.3.1/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
129 changes: 86 additions & 43 deletions internal/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cli

import (
"bytes"
"compress/gzip"
"context"
"database/sql"
"encoding/json"
Expand Down Expand Up @@ -1305,57 +1306,98 @@ func TestCloudPublishSendsNonDMRows(t *testing.T) {
tokenEnv := "DISCRAWL_TEST_PUBLISH_TOKEN"
t.Setenv(tokenEnv, "publish-token")
seenTables := map[string]crawlremote.IngestRequest{}
var sawSQLiteUpload bool
var sawSQLitePart bool
var sawSQLiteManifest bool
var sqliteBundlePart []byte
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "Bearer publish-token", req.Header.Get("Authorization"))
if req.Method == http.MethodPut && req.URL.EscapedPath() == "/v1/apps/discrawl/archives/discrawl%2Fopenclaw/sqlite" {
sawSQLiteUpload = true
uploadKind := req.Header.Get("X-Crawl-Sqlite-Upload")
payload, err := io.ReadAll(req.Body)
if err != nil {
t.Errorf("read sqlite upload: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if !bytes.HasPrefix(payload, []byte("SQLite format 3")) {
t.Errorf("sqlite upload should contain a sqlite image")
http.Error(w, "not sqlite", http.StatusBadRequest)
return
}
uploadPath := filepath.Join(dir, "uploaded-cloud.db")
if err := os.WriteFile(uploadPath, payload, 0o600); err != nil {
t.Errorf("write sqlite upload: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
uploadDB, err := sql.Open("sqlite", uploadPath)
if err != nil {
t.Errorf("open sqlite upload: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() { _ = uploadDB.Close() }()
var dmMessages int
if err := uploadDB.QueryRowContext(ctx, "select count(*) from messages where guild_id = ?", store.DirectMessageGuildID).Scan(&dmMessages); err != nil {
t.Errorf("count dm messages: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
assert.Zero(t, dmMessages)
var tableCount int
if err := uploadDB.QueryRowContext(ctx, "select count(*) from sqlite_master where type = 'table' and name in ('guilds', 'channels', 'members', 'messages')").Scan(&tableCount); err != nil {
t.Errorf("count cloud tables: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
assert.Equal(t, 4, tableCount)
assert.NotEmpty(t, req.Header.Get("X-Crawl-Content-Sha256"))
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(crawlremote.SQLiteUploadResult{
App: "discrawl",
Archive: "discrawl/openclaw",
Complete: true,
Object: &crawlremote.SQLiteObject{Key: "v1/discrawl/discrawl%2Fopenclaw/sqlite/current.db", Size: int64(len(payload))},
})
switch uploadKind {
case "bundle-part":
sawSQLitePart = true
assert.Equal(t, "application/gzip", req.Header.Get("Content-Type"))
assert.NotEmpty(t, req.Header.Get("X-Crawl-Content-Sha256"))
assert.True(t, bytes.HasPrefix(payload, []byte{0x1f, 0x8b}), "sqlite bundle part should be gzip")
sqliteBundlePart = append(sqliteBundlePart[:0], payload...)
_ = json.NewEncoder(w).Encode(crawlremote.SQLiteUploadResult{
App: "discrawl",
Archive: "discrawl/openclaw",
Complete: false,
Object: &crawlremote.SQLiteObject{Key: "v1/discrawl/discrawl%2Fopenclaw/sqlite/chunks/current.db.gz.part-0000", Size: int64(len(payload))},
})
case "bundle-manifest":
sawSQLiteManifest = true
var manifest crawlremote.SQLiteBundleManifest
if err := json.Unmarshal(payload, &manifest); err != nil {
t.Errorf("decode sqlite bundle manifest: %v", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
assert.Equal(t, crawlremote.SQLiteGzipChunkedBundleFormat, manifest.Format)
assert.Equal(t, crawlremote.SQLiteGzipCompression, manifest.Compression.Algorithm)
assert.Equal(t, int64(1), manifest.Counts["messages"])
assert.Equal(t, false, manifest.Privacy["includes_private_messages"])
assert.Equal(t, false, manifest.Privacy["includes_raw_json"])
assert.Equal(t, "@me", manifest.Privacy["excludes_guild_id"])
reader, err := gzip.NewReader(bytes.NewReader(sqliteBundlePart))
if err != nil {
t.Errorf("open sqlite bundle gzip: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
decompressed, err := io.ReadAll(reader)
if closeErr := reader.Close(); err == nil {
err = closeErr
}
if err != nil {
t.Errorf("read sqlite bundle gzip: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
uploadPath := filepath.Join(dir, "uploaded-cloud.db")
if err := os.WriteFile(uploadPath, decompressed, 0o600); err != nil {
t.Errorf("write sqlite upload: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
uploadDB, err := sql.Open("sqlite", uploadPath)
if err != nil {
t.Errorf("open sqlite upload: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer func() { _ = uploadDB.Close() }()
var dmMessages int
if err := uploadDB.QueryRowContext(ctx, "select count(*) from messages where guild_id = ?", store.DirectMessageGuildID).Scan(&dmMessages); err != nil {
t.Errorf("count dm messages: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
assert.Zero(t, dmMessages)
var tableCount int
if err := uploadDB.QueryRowContext(ctx, "select count(*) from sqlite_master where type = 'table' and name in ('guilds', 'channels', 'members', 'messages')").Scan(&tableCount); err != nil {
t.Errorf("count cloud tables: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
assert.Equal(t, 4, tableCount)
_ = json.NewEncoder(w).Encode(crawlremote.SQLiteBundleUploadResult{
App: "discrawl",
Archive: "discrawl/openclaw",
Complete: true,
Bundle: &crawlremote.SQLiteBundle{Key: "v1/discrawl/discrawl%2Fopenclaw/sqlite/current.manifest.json", Manifest: &manifest},
})
default:
http.Error(w, "missing sqlite bundle upload kind", http.StatusBadRequest)
}
return
}
assert.Equal(t, http.MethodPost, req.Method)
Expand Down Expand Up @@ -1395,13 +1437,14 @@ func TestCloudPublishSendsNonDMRows(t *testing.T) {
require.Len(t, seenTables["channels"].Rows, 1)
require.Len(t, seenTables["messages"].Rows, 1)
require.True(t, seenTables["messages"].Final)
require.True(t, sawSQLiteUpload)
require.True(t, sawSQLitePart)
require.True(t, sawSQLiteManifest)

var payload map[string]any
require.NoError(t, json.Unmarshal(out.Bytes(), &payload))
require.InDelta(t, float64(1), payload["guilds"], 0)
require.InDelta(t, float64(1), payload["messages"], 0)
require.NotNil(t, payload["sqlite_object"])
require.NotNil(t, payload["sqlite_bundle"])
}

func TestCloudSQLiteExportHelpers(t *testing.T) {
Expand Down
48 changes: 22 additions & 26 deletions internal/cli/cloud_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ func (r *runtime) runCloudPublish(args []string) error {
if err != nil {
return err
}
sqliteObject, err := uploadSQLiteArchive(r.ctx, client, "discrawl", archiveID, r.store.DB(), r.cfg.DBPath, manifest)
sqliteBundle, err := uploadSQLiteArchive(r.ctx, client, "discrawl", archiveID, r.store.DB(), r.cfg.DBPath, manifest, map[string]int64{
"guilds": guildCount,
"channels": channelCount,
"members": memberCount,
"messages": messageCount,
})
if err != nil {
return err
}
Expand All @@ -127,7 +132,7 @@ func (r *runtime) runCloudPublish(args []string) error {
"channels": channelCount,
"members": memberCount,
"messages": messageCount,
"sqlite_object": sqliteObject,
"sqlite_bundle": sqliteBundle,
})
})
}
Expand Down Expand Up @@ -199,41 +204,32 @@ func cursorFor(start int) string {
return strconv.Itoa(start)
}

func uploadSQLiteArchive(ctx context.Context, client *crawlremote.Client, app, archive string, db *sql.DB, dbPath string, manifest crawlremote.IngestManifest) (*crawlremote.SQLiteObject, error) {
func uploadSQLiteArchive(ctx context.Context, client *crawlremote.Client, app, archive string, db *sql.DB, dbPath string, manifest crawlremote.IngestManifest, counts map[string]int64) (*crawlremote.SQLiteBundle, error) {
snapshotPath, cleanup, err := sqliteSnapshotPath(ctx, db)
if err != nil {
return nil, err
}
defer cleanup()
file, err := os.Open(snapshotPath)
if err != nil {
return nil, fmt.Errorf("open sqlite snapshot: %w", err)
}
defer func() { _ = file.Close() }()
info, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("stat sqlite snapshot: %w", err)
}
sum, err := cloudFileSHA256(snapshotPath)
bundle, err := crawlremote.BuildGzipSQLiteBundle(ctx, crawlremote.SQLiteBundleBuildOptions{
App: app,
Archive: archive,
SourcePath: snapshotPath,
Counts: counts,
Privacy: map[string]any{
"excludes_guild_id": "@me",
"includes_private_messages": false,
"includes_raw_json": false,
},
})
if err != nil {
return nil, err
}
if _, err := file.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("rewind sqlite snapshot: %w", err)
}
result, err := client.UploadSQLite(ctx, app, archive, crawlremote.SQLiteUploadRequest{
Body: file,
Size: info.Size(),
ContentSHA256: sum,
SchemaName: manifest.SchemaName,
SchemaVersion: manifest.SchemaVersion,
SchemaHash: manifest.SchemaHash,
SourceSyncAt: manifest.SourceSyncAt,
})
defer bundle.Cleanup()
result, err := client.UploadSQLiteBundleFiles(ctx, app, archive, bundle.Manifest, bundle.Parts)
if err != nil {
return nil, err
}
return result.Object, nil
return result.Bundle, nil
}

func sqliteSnapshotPath(ctx context.Context, db *sql.DB) (string, func(), error) {
Expand Down