Skip to content

Commit 15e7657

Browse files
docs(vector): 📝 add vector artifact pipeline example and roadmap note (#138)
## Summary Add a runnable example demonstrating Lode's existing API for the full vector artifact lifecycle, plus a Priority Track D roadmap note confirming no new API surface is needed. ## Highlights - `examples/vector_artifacts/` covers embedding batches (`Write`), serialized FAISS indices (`StreamWrite`), `Latest()` as active pointer, version progression, rollback, and metadata-driven rebuild contracts - Priority Track D in `IMPLEMENTATION_PLAN.md` documents the validated use case with an explicit scope boundary ("Lode is durability infrastructure, not a vector database") - `ARCH_INDEX.md` updated with the new example entry - No new types, interfaces, functions, or dependencies added to `lode/` ## Test plan - [x] `go run ./examples/vector_artifacts` — runs to completion - [x] `go vet ./examples/vector_artifacts` — passes - [x] `gofumpt` / `goimports` — no diff - [x] No new dependencies in `go.mod` - [x] ARCH_INDEX entry present - [x] IMPLEMENTATION_PLAN Track D present with scope boundary 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4c36462 commit 15e7657

3 files changed

Lines changed: 302 additions & 0 deletions

File tree

docs/ARCH_INDEX.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ Example usage and integration references.
8080
- `stream_write_records/` — streaming record writes with iterator
8181
- `parquet/` — Parquet codec with schema-typed fields
8282
- `volume_sparse/` — sparse Volume: stage, commit, read with gaps
83+
- `vector_artifacts/` — vector artifact pipeline: embeddings, indices, active pointers
8384
- `s3_experimental/` — S3 adapter example (name retained for continuity)
8485

8586
---

docs/IMPLEMENTATION_PLAN.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,28 @@ Any change that affects contract behavior must:
271271
- [ ] Define docs/examples for chunk/region safety expectations
272272
- [ ] Evaluate native Zarr encoding path after reference-based workflow is validated
273273

274+
### Priority Track D — Vector Artifact Storage
275+
276+
**Status**: Validated — no new API surface required.
277+
278+
Lode's existing persistence primitives cover the full vector artifact lifecycle:
279+
- Embedding batches as raw blob snapshots (`Write`)
280+
- Serialized indices via `StreamWrite` (large binary payloads)
281+
- `Latest()` as an atomic active-index pointer
282+
- Snapshot history for rollback and version progression
283+
- Explicit metadata for provenance and rebuild contracts
284+
285+
**Scope boundary**: Lode is durability infrastructure, not a vector database.
286+
It stores embedding batches and serialized indices as opaque blobs.
287+
Similarity search, index construction, and query execution remain the caller's responsibility.
288+
289+
**Deliverables**:
290+
- [x] `examples/vector_artifacts/` — end-to-end pipeline example
291+
292+
**Deferred**:
293+
- Custom codec or format work for vector-specific encodings (not needed; raw blobs suffice)
294+
- ANN index introspection or validation (execution concern, out of scope)
295+
274296
---
275297

276298
## Phase 6 — Dual Persistence Paradigms (v0.6+)

examples/vector_artifacts/main.go

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
// Example: Vector Artifact Pipeline
2+
//
3+
// This example demonstrates using Lode for the full vector artifact lifecycle:
4+
// - Store embedding batches as raw blob snapshots
5+
// - Store serialized FAISS indices via StreamWrite
6+
// - Use Latest() as an atomic active-index pointer
7+
// - Version progression when indices are rebuilt
8+
// - Rollback to a previous index version by snapshot ID
9+
// - Rebuild contract: metadata alone enables deterministic reconstruction
10+
//
11+
// Lode is durability infrastructure, not a vector database. It stores embedding
12+
// batches and serialized indices as opaque blobs. Similarity search, index
13+
// construction, and query execution remain the caller's responsibility.
14+
//
15+
// Run with: go run ./examples/vector_artifacts
16+
package main
17+
18+
import (
19+
"bytes"
20+
"context"
21+
"encoding/binary"
22+
"fmt"
23+
"log"
24+
"math"
25+
"os"
26+
27+
"github.com/pithecene-io/lode/internal/testutil"
28+
"github.com/pithecene-io/lode/lode"
29+
)
30+
31+
func main() {
32+
if err := run(); err != nil {
33+
log.Fatal(err)
34+
}
35+
}
36+
37+
func run() error {
38+
ctx := context.Background()
39+
40+
// Create a temporary directory for storage
41+
tmpDir, err := os.MkdirTemp("", "lode-vector-artifacts-*")
42+
if err != nil {
43+
return fmt.Errorf("create temp dir: %w", err)
44+
}
45+
defer testutil.RemoveAll(tmpDir)
46+
47+
fmt.Printf("Storage root: %s\n\n", tmpDir)
48+
49+
// Create filesystem store factory
50+
storeFactory := lode.NewFSFactory(tmpDir)
51+
52+
// -------------------------------------------------------------------------
53+
// STORE EMBEDDING BATCHES
54+
// -------------------------------------------------------------------------
55+
fmt.Println("=== STORE EMBEDDING BATCHES ===")
56+
57+
// Embeddings dataset: raw blob mode (no codec = default bundle).
58+
// Each snapshot holds a serialized batch of float32 vectors.
59+
embDS, err := lode.NewDataset("embeddings", storeFactory)
60+
if err != nil {
61+
return fmt.Errorf("create embeddings dataset: %w", err)
62+
}
63+
64+
// Batch 1: 100 vectors, 128 dimensions
65+
batch1 := simulateEmbeddings(100, 128)
66+
embSnap1, err := embDS.Write(ctx, []any{batch1}, lode.Metadata{
67+
"dimension": 128,
68+
"vector_count": 100,
69+
"dtype": "float32",
70+
"source": "training-set-part-1",
71+
})
72+
if err != nil {
73+
return fmt.Errorf("write embedding batch 1: %w", err)
74+
}
75+
fmt.Printf("Batch 1: %s (%d bytes, 100 vectors x 128 dims)\n", embSnap1.ID, embSnap1.Manifest.Files[0].SizeBytes)
76+
77+
// Batch 2: 50 vectors, 128 dimensions
78+
batch2 := simulateEmbeddings(50, 128)
79+
embSnap2, err := embDS.Write(ctx, []any{batch2}, lode.Metadata{
80+
"dimension": 128,
81+
"vector_count": 50,
82+
"dtype": "float32",
83+
"source": "training-set-part-2",
84+
})
85+
if err != nil {
86+
return fmt.Errorf("write embedding batch 2: %w", err)
87+
}
88+
fmt.Printf("Batch 2: %s (%d bytes, 50 vectors x 128 dims)\n\n", embSnap2.ID, embSnap2.Manifest.Files[0].SizeBytes)
89+
90+
// -------------------------------------------------------------------------
91+
// STORE SERIALIZED INDEX
92+
// -------------------------------------------------------------------------
93+
fmt.Println("=== STORE SERIALIZED INDEX ===")
94+
95+
// Index dataset: raw blob mode. StreamWrite handles large binary payloads
96+
// that should be written in a single pass (e.g., faiss.write_index output).
97+
indexDS, err := lode.NewDataset("index", storeFactory)
98+
if err != nil {
99+
return fmt.Errorf("create index dataset: %w", err)
100+
}
101+
102+
// Write first index via StreamWrite (built from batch 1 only).
103+
// StreamWrite is preferred for large blobs: data flows directly to the
104+
// final object path with no intermediate buffering.
105+
sw, err := indexDS.StreamWrite(ctx, lode.Metadata{
106+
"index_type": "IVF1024,Flat",
107+
"dimension": 128,
108+
"vector_count": 100,
109+
"source_embeddings": []any{string(embSnap1.ID)},
110+
"faiss_version": "1.7.4",
111+
})
112+
if err != nil {
113+
return fmt.Errorf("start stream write for index v1: %w", err)
114+
}
115+
116+
indexBlob1 := simulateIndex(8192)
117+
if _, err := sw.Write(indexBlob1); err != nil {
118+
_ = sw.Abort(ctx)
119+
return fmt.Errorf("write index v1 data: %w", err)
120+
}
121+
122+
idxSnap1, err := sw.Commit(ctx)
123+
if err != nil {
124+
return fmt.Errorf("commit index v1: %w", err)
125+
}
126+
fmt.Printf("Index v1: %s (%d bytes)\n", idxSnap1.ID, idxSnap1.Manifest.Files[0].SizeBytes)
127+
fmt.Printf(" Built from: %v\n\n", idxSnap1.Manifest.Metadata["source_embeddings"])
128+
129+
// -------------------------------------------------------------------------
130+
// LATEST AS ACTIVE POINTER
131+
// -------------------------------------------------------------------------
132+
fmt.Println("=== LATEST AS ACTIVE POINTER ===")
133+
134+
// Latest() returns the most recently committed snapshot — the "active" index.
135+
// No separate flag or pointer file needed; snapshot ordering is the pointer.
136+
active, err := indexDS.Latest(ctx)
137+
if err != nil {
138+
return fmt.Errorf("get active index: %w", err)
139+
}
140+
fmt.Printf("Active index: %s\n", active.ID)
141+
fmt.Printf(" Type: %v\n", active.Manifest.Metadata["index_type"])
142+
fmt.Printf(" Vectors: %v\n", active.Manifest.Metadata["vector_count"])
143+
fmt.Printf(" FAISS version: %v\n\n", active.Manifest.Metadata["faiss_version"])
144+
145+
// -------------------------------------------------------------------------
146+
// VERSION PROGRESSION
147+
// -------------------------------------------------------------------------
148+
fmt.Println("=== VERSION PROGRESSION ===")
149+
150+
// Build a second index from both embedding batches.
151+
sw2, err := indexDS.StreamWrite(ctx, lode.Metadata{
152+
"index_type": "IVF1024,Flat",
153+
"dimension": 128,
154+
"vector_count": 150,
155+
"source_embeddings": []any{string(embSnap1.ID), string(embSnap2.ID)},
156+
"faiss_version": "1.7.4",
157+
})
158+
if err != nil {
159+
return fmt.Errorf("start stream write for index v2: %w", err)
160+
}
161+
162+
indexBlob2 := simulateIndex(12288)
163+
if _, err := sw2.Write(indexBlob2); err != nil {
164+
_ = sw2.Abort(ctx)
165+
return fmt.Errorf("write index v2 data: %w", err)
166+
}
167+
168+
idxSnap2, err := sw2.Commit(ctx)
169+
if err != nil {
170+
return fmt.Errorf("commit index v2: %w", err)
171+
}
172+
fmt.Printf("Index v2: %s (%d bytes)\n", idxSnap2.ID, idxSnap2.Manifest.Files[0].SizeBytes)
173+
174+
// Latest() now returns the new version.
175+
active2, err := indexDS.Latest(ctx)
176+
if err != nil {
177+
return fmt.Errorf("get active index after v2: %w", err)
178+
}
179+
fmt.Printf("Active index updated: %s\n", active2.ID)
180+
fmt.Printf(" Vectors: %v (was %v)\n\n", active2.Manifest.Metadata["vector_count"], active.Manifest.Metadata["vector_count"])
181+
182+
// -------------------------------------------------------------------------
183+
// ROLLBACK
184+
// -------------------------------------------------------------------------
185+
fmt.Println("=== ROLLBACK ===")
186+
187+
// Access the old index by snapshot ID — no deletion or mutation needed.
188+
// Both versions remain immutable in storage.
189+
oldSnap, err := indexDS.Snapshot(ctx, idxSnap1.ID)
190+
if err != nil {
191+
return fmt.Errorf("get old index snapshot: %w", err)
192+
}
193+
fmt.Printf("Previous index: %s\n", oldSnap.ID)
194+
fmt.Printf(" Vectors: %v\n", oldSnap.Manifest.Metadata["vector_count"])
195+
196+
// Read the old index data back and verify size.
197+
oldData, err := indexDS.Read(ctx, idxSnap1.ID)
198+
if err != nil {
199+
return fmt.Errorf("read old index data: %w", err)
200+
}
201+
oldBlob := oldData[0].([]byte)
202+
fmt.Printf(" Read back: %d bytes (matches original: %v)\n", len(oldBlob), bytes.Equal(oldBlob, indexBlob1))
203+
204+
// List all index snapshots to show version history.
205+
allSnaps, err := indexDS.Snapshots(ctx)
206+
if err != nil {
207+
return fmt.Errorf("list index snapshots: %w", err)
208+
}
209+
fmt.Printf(" Total index versions: %d\n\n", len(allSnaps))
210+
211+
// -------------------------------------------------------------------------
212+
// REBUILD CONTRACT
213+
// -------------------------------------------------------------------------
214+
fmt.Println("=== REBUILD CONTRACT ===")
215+
216+
// Metadata alone enables deterministic reconstruction:
217+
// read active index metadata → extract source_embeddings → look up each
218+
// embedding snapshot → recover the full provenance chain.
219+
fmt.Println("Provenance chain for active index:")
220+
fmt.Printf(" Index snapshot: %s\n", active2.ID)
221+
fmt.Printf(" Index type: %v\n", active2.Manifest.Metadata["index_type"])
222+
223+
sourceIDs, ok := active2.Manifest.Metadata["source_embeddings"].([]any)
224+
if !ok {
225+
return fmt.Errorf("expected source_embeddings to be []any, got %T", active2.Manifest.Metadata["source_embeddings"])
226+
}
227+
for i, rawID := range sourceIDs {
228+
snapID := lode.DatasetSnapshotID(rawID.(string))
229+
embSnap, err := embDS.Snapshot(ctx, snapID)
230+
if err != nil {
231+
return fmt.Errorf("look up embedding snapshot %s: %w", snapID, err)
232+
}
233+
fmt.Printf(" Source embedding %d: %s\n", i+1, embSnap.ID)
234+
fmt.Printf(" Vectors: %v, Source: %v\n", embSnap.Manifest.Metadata["vector_count"], embSnap.Manifest.Metadata["source"])
235+
}
236+
fmt.Println()
237+
238+
// -------------------------------------------------------------------------
239+
// SUCCESS
240+
// -------------------------------------------------------------------------
241+
fmt.Println("=== SUCCESS ===")
242+
fmt.Println("Vector artifact pipeline complete!")
243+
fmt.Println("\nKey points demonstrated:")
244+
fmt.Println(" 1. Embedding batches stored as raw blob snapshots (Write)")
245+
fmt.Println(" 2. Serialized indices stored via StreamWrite (large binary payloads)")
246+
fmt.Println(" 3. Latest() is the atomic active-index pointer — no separate flag needed")
247+
fmt.Println(" 4. Version progression: new index replaces active pointer on commit")
248+
fmt.Println(" 5. Rollback: access any prior version by snapshot ID (immutable history)")
249+
fmt.Println(" 6. Rebuild contract: metadata tracks source_embeddings for provenance")
250+
fmt.Println(" 7. Lode is durability — search, training, and execution are the caller's job")
251+
252+
return nil
253+
}
254+
255+
// simulateEmbeddings generates deterministic float32 vector bytes.
256+
// Returns count * dimension * 4 bytes of IEEE 754 float32 data.
257+
func simulateEmbeddings(count, dimension int) []byte {
258+
buf := new(bytes.Buffer)
259+
for i := range count {
260+
for d := range dimension {
261+
// Deterministic pseudo-embedding: sin-based pattern
262+
val := float32(math.Sin(float64(i*dimension+d) * 0.01))
263+
_ = binary.Write(buf, binary.LittleEndian, val)
264+
}
265+
}
266+
return buf.Bytes()
267+
}
268+
269+
// simulateIndex generates a fake serialized index blob with a header.
270+
func simulateIndex(size int) []byte {
271+
buf := make([]byte, size)
272+
// Write a recognizable header (mimics FAISS magic bytes)
273+
copy(buf, []byte("FAIDX001"))
274+
// Fill remainder with deterministic pattern
275+
for i := 8; i < size; i++ {
276+
buf[i] = byte(i % 251)
277+
}
278+
return buf
279+
}

0 commit comments

Comments
 (0)