Skip to content

Commit b9ad336

Browse files
authored
Merge pull request #36 from amikos-tech/codex/issue-23-compaction-api
Implement explicit compaction API (issue #23)
2 parents 76b78b9 + 17eb081 commit b9ad336

8 files changed

Lines changed: 1098 additions & 44 deletions

File tree

GO_API_SURFACE.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ Implemented server lifecycle APIs:
2929
- `(*Server).Stop() error`
3030
- `(*Server).Close() error`
3131
- `(*Server).Backup(options ...BackupOption) (*BackupManifest, error)`
32+
- `(*Server).CompactCollection(request CompactCollectionRequest) (*CompactionResult, error)` (supports `TenantID` + `DatabaseName` scope together)
33+
- `(*Server).CompactAll(request CompactAllRequest) (*CompactionResult, error)` (supports `TenantID` + `DatabaseName` scope together; `result.Collections[i].Error` reports per-collection failures)
3234

3335
Example:
3436

@@ -59,6 +61,33 @@ if err != nil {
5961
fmt.Println("backup manifest:", manifest.ManifestPath)
6062
```
6163

64+
Server compaction example:
65+
66+
```go
67+
result, err := srv.CompactCollection(chroma.CompactCollectionRequest{
68+
Name: "docs",
69+
TenantID: "team_a",
70+
DatabaseName: "prod_db",
71+
})
72+
if err != nil {
73+
panic(err)
74+
}
75+
fmt.Println("compacted collections:", result.CollectionCount)
76+
```
77+
78+
Compaction semantics:
79+
80+
- `CompactCollection` and `CompactAll` run explicit compaction via Chroma's local compaction manager.
81+
- You can pass both `TenantID` and `DatabaseName` in the same request.
82+
- For `CompactCollection`, collection name lookup is performed inside that tenant+database scope.
83+
- When omitted, tenant/database scope defaults to `default_tenant` and `default_database`.
84+
- For each collection, compaction runs backfill then log purge (WAL cleanup).
85+
- This is not a full HNSW rebuild from scratch and does not change collection configuration/schema.
86+
- In server mode, the server is unavailable while compaction runs (stop -> compact in embedded mode -> restart).
87+
- `CompactAll` continues across collections and reports per-collection failures in `result.Collections[i].Error`.
88+
- `result.CollectionCount` is attempted collections, not only successful collections.
89+
- `pending_ops_before`/`pending_ops_after` are advisory; if unavailable they are omitted and surfaced via `pending_ops_before_error`/`pending_ops_after_error`.
90+
6291
Backup constraints (applies to server and embedded backup):
6392

6493
- `DestinationPath` must not exist or must be an empty directory.
@@ -106,6 +135,8 @@ fmt.Println("snapshot dir:", manifest.SnapshotPath)
106135
- `(*Embedded).MaxBatchSize() (uint32, error)`
107136
- `(*Embedded).Healthcheck() (*EmbeddedHealthCheckResponse, error)`
108137
- `(*Embedded).IndexingStatus(request EmbeddedIndexingStatusRequest) (*EmbeddedIndexingStatusResponse, error)`
138+
- `(*Embedded).CompactCollection(request CompactCollectionRequest) (*CompactionResult, error)`
139+
- `(*Embedded).CompactAll(request CompactAllRequest) (*CompactionResult, error)` (`result.Collections[i].Error` reports per-collection failures)
109140
- `(*Embedded).Reset() error`
110141

111142
```go
@@ -128,6 +159,17 @@ if err != nil {
128159
}
129160
```
130161

162+
```go
163+
compacted, err := embedded.CompactAll(chroma.CompactAllRequest{
164+
TenantID: "team_a",
165+
DatabaseName: "my_db",
166+
})
167+
if err != nil {
168+
panic(err)
169+
}
170+
fmt.Println(compacted.CollectionCount, compacted.DurationMS)
171+
```
172+
131173
### 3.3 Tenant APIs
132174

133175
- `(*Embedded).CreateTenant(request EmbeddedCreateTenantRequest) error`

README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,8 @@ For a detailed, example-heavy reference of the currently implemented Go APIs, se
328328
| `(*Server) Stop() error` | Gracefully stop the server. |
329329
| `(*Server) Close() error` | Stop and free resources. |
330330
| `(*Server) Backup(options ...BackupOption) (*BackupManifest, error)` | Snapshot persisted data with optional restart. |
331+
| `(*Server) CompactCollection(request CompactCollectionRequest) (*CompactionResult, error)` | Run explicit compaction for one collection (server restarts after operation). Scope can include both `TenantID` and `DatabaseName` together. |
332+
| `(*Server) CompactAll(request CompactAllRequest) (*CompactionResult, error)` | Run explicit compaction for all collections (server restarts after operation). Scope can include both `TenantID` and `DatabaseName` together. Per-collection failures are reported in `result.Collections[i].Error`. |
331333
| `NewEmbedded(opts ...EmbeddedOption) (*Embedded, error)` | Start in-process embedded mode. |
332334
| `StartEmbedded(config StartEmbeddedConfig) (*Embedded, error)` | Start embedded mode from YAML config. |
333335
| `(*Embedded) Heartbeat() (uint64, error)` | Read in-process heartbeat nanoseconds. |
@@ -355,10 +357,45 @@ For a detailed, example-heavy reference of the currently implemented Go APIs, se
355357
| `(*Embedded) Add(request EmbeddedAddRequest) error` | Add records without HTTP. |
356358
| `(*Embedded) Query(request EmbeddedQueryRequest) (*EmbeddedQueryResponse, error)` | Query records without HTTP (supports `where` and `where_document`). |
357359
| `(*Embedded) IndexingStatus(request EmbeddedIndexingStatusRequest) (*EmbeddedIndexingStatusResponse, error)` | Get collection indexing status (may be unimplemented in local backend). |
360+
| `(*Embedded) CompactCollection(request CompactCollectionRequest) (*CompactionResult, error)` | Run explicit compaction for one collection. Scope can include both `TenantID` and `DatabaseName` together. |
361+
| `(*Embedded) CompactAll(request CompactAllRequest) (*CompactionResult, error)` | Run explicit compaction for all collections. Scope can include both `TenantID` and `DatabaseName` together. Per-collection failures are reported in `result.Collections[i].Error`. |
358362
| `(*Embedded) Reset() error` | Reset local state when enabled. |
359363
| `(*Embedded) Backup(options ...BackupOption) (*BackupManifest, error)` | Snapshot persisted data with optional reopen. |
360364
| `(*Embedded) Close() error` | Free embedded resources. |
361365

366+
### Compaction Semantics
367+
368+
`CompactCollection` and `CompactAll` run Chroma explicit compaction through the local compaction manager.
369+
370+
Per compacted collection, the operation performs:
371+
- backfill (apply pending log operations into collection/index state)
372+
- log purge (remove compacted WAL log records)
373+
374+
This compaction is not a full index rebuild. In particular, it does not rebuild HNSW from scratch or change collection configuration/schema.
375+
376+
Operational notes:
377+
- You can scope compaction with both `TenantID` and `DatabaseName` in the same request.
378+
- For `CompactCollection`, collection name resolution happens inside that tenant+database scope.
379+
- If omitted, scope defaults to `default_tenant` and `default_database`.
380+
- In server mode, the server is unavailable during compaction because it is stopped, compacted via embedded mode, then restarted.
381+
- `CompactAll` continues across collections and records per-collection failures in `result.Collections[i].Error`.
382+
- `result.CollectionCount` is the number of attempted collections (including entries with `Error`).
383+
- `pending_ops_before`/`pending_ops_after` are advisory metrics; when unavailable they are omitted and `pending_ops_before_error`/`pending_ops_after_error` explain why.
384+
385+
Example with explicit tenant+database scope:
386+
387+
```go
388+
result, err := server.CompactCollection(chroma.CompactCollectionRequest{
389+
Name: "docs",
390+
TenantID: "team_a",
391+
DatabaseName: "prod_db",
392+
})
393+
if err != nil {
394+
panic(err)
395+
}
396+
fmt.Println(result.CollectionCount)
397+
```
398+
362399
### Backup API
363400

364401
Backup writes a consistent snapshot for either managed server mode or embedded mode:

chroma.go

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,46 +19,48 @@ var (
1919
ffiMu sync.Mutex
2020

2121
// FFI functions
22-
chromaServerStart func(*byte) uintptr
23-
chromaServerStartFromString func(*byte) uintptr
24-
chromaServerPort func(uintptr) int32
25-
chromaServerAddress func(uintptr) *byte
26-
chromaServerPersistPath func(uintptr) *byte
27-
chromaServerStop func(uintptr) int32
28-
chromaServerFree func(uintptr)
29-
chromaEmbeddedStart func(*byte) uintptr
30-
chromaEmbeddedStartFromString func(*byte) uintptr
31-
chromaEmbeddedPersistPath func(uintptr) *byte
32-
chromaEmbeddedFree func(uintptr)
33-
chromaEmbeddedHeartbeat func(uintptr, *uint64) int32
34-
chromaEmbeddedGetMaxBatchSize func(uintptr, *uint32) int32
35-
chromaEmbeddedCreateTenant func(uintptr, *byte) int32
36-
chromaEmbeddedGetTenant func(uintptr, *byte) *byte
37-
chromaEmbeddedUpdateTenant func(uintptr, *byte) int32
38-
chromaEmbeddedReset func(uintptr) int32
39-
chromaEmbeddedCreateDatabase func(uintptr, *byte) int32
40-
chromaEmbeddedListDatabases func(uintptr, *byte) *byte
41-
chromaEmbeddedGetDatabase func(uintptr, *byte) *byte
42-
chromaEmbeddedDeleteDatabase func(uintptr, *byte) int32
43-
chromaEmbeddedListCollections func(uintptr, *byte) *byte
44-
chromaEmbeddedGetCollection func(uintptr, *byte) *byte
45-
chromaEmbeddedCountCollections func(uintptr, *byte, *uint32) int32
46-
chromaEmbeddedUpdateCollection func(uintptr, *byte) int32
47-
chromaEmbeddedDeleteCollection func(uintptr, *byte) int32
48-
chromaEmbeddedForkCollection func(uintptr, *byte) *byte
49-
chromaEmbeddedCount func(uintptr, *byte, *uint32) int32
50-
chromaEmbeddedGet func(uintptr, *byte) *byte
51-
chromaEmbeddedUpdate func(uintptr, *byte) int32
52-
chromaEmbeddedUpsert func(uintptr, *byte) int32
53-
chromaEmbeddedDeleteRecords func(uintptr, *byte) int32
54-
chromaEmbeddedCreateCollection func(uintptr, *byte) *byte
55-
chromaEmbeddedAdd func(uintptr, *byte) int32
56-
chromaEmbeddedQuery func(uintptr, *byte) *byte
57-
chromaEmbeddedIndexingStatus func(uintptr, *byte) *byte
58-
chromaEmbeddedHealthcheck func(uintptr) *byte
59-
chromaStringFree func(*byte)
60-
chromaGetLastError func() *byte
61-
chromaVersion func() *byte
22+
chromaServerStart func(*byte) uintptr
23+
chromaServerStartFromString func(*byte) uintptr
24+
chromaServerPort func(uintptr) int32
25+
chromaServerAddress func(uintptr) *byte
26+
chromaServerPersistPath func(uintptr) *byte
27+
chromaServerStop func(uintptr) int32
28+
chromaServerFree func(uintptr)
29+
chromaEmbeddedStart func(*byte) uintptr
30+
chromaEmbeddedStartFromString func(*byte) uintptr
31+
chromaEmbeddedPersistPath func(uintptr) *byte
32+
chromaEmbeddedFree func(uintptr)
33+
chromaEmbeddedHeartbeat func(uintptr, *uint64) int32
34+
chromaEmbeddedGetMaxBatchSize func(uintptr, *uint32) int32
35+
chromaEmbeddedCreateTenant func(uintptr, *byte) int32
36+
chromaEmbeddedGetTenant func(uintptr, *byte) *byte
37+
chromaEmbeddedUpdateTenant func(uintptr, *byte) int32
38+
chromaEmbeddedReset func(uintptr) int32
39+
chromaEmbeddedCreateDatabase func(uintptr, *byte) int32
40+
chromaEmbeddedListDatabases func(uintptr, *byte) *byte
41+
chromaEmbeddedGetDatabase func(uintptr, *byte) *byte
42+
chromaEmbeddedDeleteDatabase func(uintptr, *byte) int32
43+
chromaEmbeddedListCollections func(uintptr, *byte) *byte
44+
chromaEmbeddedGetCollection func(uintptr, *byte) *byte
45+
chromaEmbeddedCountCollections func(uintptr, *byte, *uint32) int32
46+
chromaEmbeddedUpdateCollection func(uintptr, *byte) int32
47+
chromaEmbeddedDeleteCollection func(uintptr, *byte) int32
48+
chromaEmbeddedForkCollection func(uintptr, *byte) *byte
49+
chromaEmbeddedCount func(uintptr, *byte, *uint32) int32
50+
chromaEmbeddedGet func(uintptr, *byte) *byte
51+
chromaEmbeddedUpdate func(uintptr, *byte) int32
52+
chromaEmbeddedUpsert func(uintptr, *byte) int32
53+
chromaEmbeddedDeleteRecords func(uintptr, *byte) int32
54+
chromaEmbeddedCreateCollection func(uintptr, *byte) *byte
55+
chromaEmbeddedAdd func(uintptr, *byte) int32
56+
chromaEmbeddedQuery func(uintptr, *byte) *byte
57+
chromaEmbeddedIndexingStatus func(uintptr, *byte) *byte
58+
chromaEmbeddedHealthcheck func(uintptr) *byte
59+
chromaEmbeddedCompactCollection func(uintptr, *byte) *byte
60+
chromaEmbeddedCompactAll func(uintptr, *byte) *byte
61+
chromaStringFree func(*byte)
62+
chromaGetLastError func() *byte
63+
chromaVersion func() *byte
6264
)
6365

6466
const maxCStringLen = 1 << 20
@@ -118,6 +120,8 @@ func registerFunctions() error {
118120
{&chromaEmbeddedQuery, "chroma_embedded_query"},
119121
{&chromaEmbeddedIndexingStatus, "chroma_embedded_indexing_status"},
120122
{&chromaEmbeddedHealthcheck, "chroma_embedded_healthcheck"},
123+
{&chromaEmbeddedCompactCollection, "chroma_embedded_compact_collection"},
124+
{&chromaEmbeddedCompactAll, "chroma_embedded_compact_all"},
121125
{&chromaStringFree, "chroma_string_free"},
122126
{&chromaGetLastError, "chroma_get_last_error"},
123127
{&chromaVersion, "chroma_version"},

0 commit comments

Comments
 (0)