Skip to content

Commit 4e934af

Browse files
vetezeRyan Vetezeoz-agent
authored
feat(media): Lore blob store integration � Phase B (#1154) (#1155)
Wire @lore-vcs/sdk into the upload path behind a BlobStore interface. The read path is unchanged � readFile(storagePath) keeps working because Lore leaves the working copy on disk after commit. Changes: - blob-store.ts: BlobStore interface + BlobRef type - blob-store-lore.ts: LoreBlobStore with per-DID PQueue serialization, offline Lore repo per DID, idempotent ensureRepo, non-fatal put/gc - route.ts: blobStore.put() called after writeFile, loreRef stored on the asset row (non-fatal � upload succeeds even if Lore fails) - media.ts + 0037_lore_blob_ref.sql: nullable lore_ref TEXT column - next.config.js: @lore-vcs/sdk and koffi added to external packages - package.json: @lore-vcs/sdk + p-queue added - blob-store-lore.test.ts: 11 unit tests (all passing, no live Lore) Key design decisions from spike (spike/1154-lore-verdict.md): - Additive: working copy stays on disk, read path unchanged - One Lore repo per DID at {MEDIA_ROOT}/{didPath}/ - Per-DID write queue (concurrency=1) for file-lock safety - loreRef is the Lore revision hash (64-char hex), NOT a DFOS CID - DFOS CID (#1122) will be a separate column when that ships Co-authored-by: Ryan Veteze <ryan@imajin.ca> Co-authored-by: Oz <oz-agent@warp.dev>
1 parent fefd0ec commit 4e934af

9 files changed

Lines changed: 498 additions & 1 deletion

File tree

apps/kernel/app/media/api/assets/route.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { rateLimit, getClientIP } from "@imajin/config";
1212
import { createLogger } from "@imajin/logger";
1313
import { getDefaultManifest, signManifest, canonicalize } from "@imajin/fair";
1414
import { publishContentEvent } from "@imajin/dfos";
15+
import { blobStore } from "@/src/lib/media/blob-store-lore";
1516

1617
const log = createLogger("kernel");
1718

@@ -255,6 +256,14 @@ export async function POST(request: NextRequest) {
255256
);
256257
}
257258

259+
// Register in Lore blob store (non-fatal — asset is still served from storagePath if this fails)
260+
const blobRef = await blobStore
261+
.put(ownerDid, storagePath, { assetId, sizeBytes: file.size })
262+
.catch((err: unknown) => {
263+
log.error({ err: String(err), assetId }, "Lore blob store put failed (non-fatal)");
264+
return null;
265+
});
266+
258267
// Insert DB record
259268
let record;
260269
try {
@@ -271,6 +280,7 @@ export async function POST(request: NextRequest) {
271280
hash,
272281
fairManifest: signedManifest,
273282
fairPath,
283+
loreRef: blobRef?.loreRef ?? null,
274284
status: "active",
275285
metadata: context ? { context } : {},
276286
})

apps/kernel/next.config.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ const nextConfig = {
3636
],
3737
},
3838
experimental: {
39-
serverComponentsExternalPackages: ['@metalabel/dfos-protocol'],
39+
serverComponentsExternalPackages: [
40+
'@metalabel/dfos-protocol',
41+
'@lore-vcs/sdk',
42+
'koffi',
43+
],
4044
serverActions: { bodySizeLimit: '2gb' },
4145
},
4246
webpack: (config, { isServer }) => {

apps/kernel/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
"@imajin/trust-graph": "workspace:*",
3434
"@imajin/ui": "workspace:^",
3535
"@imajin/vault-core": "workspace:*",
36+
"@lore-vcs/sdk": "^0.8.3",
3637
"@metalabel/dfos-protocol": "^0.10.0",
3738
"@metalabel/dfos-web-relay": "^0.10.0",
3839
"@noble/ciphers": "^1.2.1",
@@ -50,6 +51,7 @@
5051
"hono": "^4.12.25",
5152
"jose": "^6.1.3",
5253
"nanoid": "^5",
54+
"p-queue": "^8.0.1",
5355
"next": "^14.2.0",
5456
"otplib": "^13.4.0",
5557
"postgres": "^3.4.5",

apps/kernel/src/db/schemas/media.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ export const assets = mediaSchema.table(
2828
fairManifest: jsonb("fair_manifest").default({}), // inline .fair JSON
2929
fairPath: text("fair_path"), // path to .fair.json file
3030

31+
// Lore storage pointer (#1154 — Layer B blob store)
32+
// 64-char SHA-256 hex revision hash from @lore-vcs/sdk REVISION_COMMIT_REVISION event.
33+
// NULL for assets uploaded before Lore integration. Distinct from the DFOS CID (#1122).
34+
loreRef: text("lore_ref"),
35+
3136
// DFOS federation anchor
3237
fairDfosEventId: text("fair_dfos_event_id"), // DFOS event ID for signed manifest anchor
3338

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
import { describe, it, expect, vi, beforeEach } from 'vitest';
2+
3+
// ─── Mocks ─────────────────────────────────────────────────────────────────
4+
5+
const mockWaitAsync = vi.fn();
6+
const mockFilterByType = vi.fn();
7+
const mockCollectAsync = vi.fn();
8+
9+
const mockHandle = {
10+
waitAsync: mockWaitAsync,
11+
filterByType: mockFilterByType,
12+
collectAsync: mockCollectAsync,
13+
};
14+
mockFilterByType.mockReturnValue(mockHandle);
15+
16+
const mockLore = {
17+
logConfigure: vi.fn(),
18+
shutdown: vi.fn(),
19+
repositoryCreate: vi.fn(() => mockHandle),
20+
fileStage: vi.fn(() => mockHandle),
21+
revisionCommit: vi.fn(() => mockHandle),
22+
};
23+
24+
vi.mock('@lore-vcs/sdk', () => ({ lore: mockLore }));
25+
vi.mock('@lore-vcs/sdk/types/enums', () => ({
26+
LoreEventTag: { REVISION_COMMIT_REVISION: 'REVISION_COMMIT_REVISION' },
27+
LoreLogLevel: { WARN: 'WARN' },
28+
}));
29+
30+
vi.mock('@imajin/logger', () => ({
31+
createLogger: vi.fn(() => ({ info: vi.fn(), error: vi.fn(), warn: vi.fn() })),
32+
}));
33+
34+
// Fake the .lore directory check so we don't need a real filesystem
35+
vi.mock('node:fs', () => ({
36+
existsSync: vi.fn(() => false), // pretend .lore doesn't exist → triggers repositoryCreate
37+
}));
38+
39+
// ─── Subject ───────────────────────────────────────────────────────────────
40+
41+
// Import AFTER mocks are registered
42+
const { LoreBlobStore } = await import('../blob-store-lore');
43+
44+
// ─── Helpers ───────────────────────────────────────────────────────────────
45+
46+
const FAKE_REV = 'a'.repeat(64);
47+
const OWNER_DID = 'did:imajin:owner123';
48+
const FILE_PATH = '/mnt/media/did_imajin_owner123/assets/asset_abc.mp4';
49+
const HINT = { assetId: 'asset_abc', sizeBytes: 512_000 };
50+
51+
function setupSuccessfulCommit(revision = FAKE_REV) {
52+
mockWaitAsync.mockResolvedValue(0);
53+
mockCollectAsync.mockResolvedValue([
54+
{ tag: 'REVISION_COMMIT_REVISION', data: { revision } },
55+
]);
56+
}
57+
58+
// ─── Tests ─────────────────────────────────────────────────────────────────
59+
60+
describe('LoreBlobStore', () => {
61+
let store: InstanceType<typeof LoreBlobStore>;
62+
63+
beforeEach(() => {
64+
vi.clearAllMocks();
65+
mockFilterByType.mockReturnValue(mockHandle);
66+
store = new LoreBlobStore();
67+
});
68+
69+
it('put returns a 64-char loreRef on success', async () => {
70+
setupSuccessfulCommit();
71+
72+
const ref = await store.put(OWNER_DID, FILE_PATH, HINT);
73+
74+
expect(ref.loreRef).toBe(FAKE_REV);
75+
expect(ref.loreRef).toHaveLength(64);
76+
expect(ref.sizeBytes).toBe(HINT.sizeBytes);
77+
});
78+
79+
it('put calls repositoryCreate when .lore directory does not exist', async () => {
80+
setupSuccessfulCommit();
81+
82+
await store.put(OWNER_DID, FILE_PATH, HINT);
83+
84+
expect(mockLore.repositoryCreate).toHaveBeenCalledOnce();
85+
expect(mockLore.repositoryCreate).toHaveBeenCalledWith(
86+
expect.objectContaining({ offline: true }),
87+
expect.objectContaining({ repositoryUrl: expect.stringContaining('imajin-media') }),
88+
);
89+
});
90+
91+
it('put skips repositoryCreate on second call for the same DID (in-process cache)', async () => {
92+
setupSuccessfulCommit();
93+
94+
await store.put(OWNER_DID, FILE_PATH, HINT);
95+
await store.put(OWNER_DID, FILE_PATH, { ...HINT, assetId: 'asset_def' });
96+
97+
// repositoryCreate should only be called once across both puts
98+
expect(mockLore.repositoryCreate).toHaveBeenCalledOnce();
99+
});
100+
101+
it('put calls fileStage with the correct filePath', async () => {
102+
setupSuccessfulCommit();
103+
104+
await store.put(OWNER_DID, FILE_PATH, HINT);
105+
106+
expect(mockLore.fileStage).toHaveBeenCalledWith(
107+
expect.objectContaining({ repositoryPath: expect.stringContaining('did_imajin_owner123') }),
108+
{ paths: [FILE_PATH] },
109+
);
110+
});
111+
112+
it('put includes assetId in the commit message', async () => {
113+
setupSuccessfulCommit();
114+
115+
await store.put(OWNER_DID, FILE_PATH, HINT);
116+
117+
expect(mockLore.revisionCommit).toHaveBeenCalledWith(
118+
expect.any(Object),
119+
{ message: `upload: ${HINT.assetId}` },
120+
);
121+
});
122+
123+
it('put throws when revision hash is missing from commit events', async () => {
124+
mockWaitAsync.mockResolvedValue(0);
125+
mockCollectAsync.mockResolvedValue([]); // no REVISION_COMMIT_REVISION event
126+
127+
await expect(store.put(OWNER_DID, FILE_PATH, HINT)).rejects.toThrow(
128+
'did not return a valid revision hash',
129+
);
130+
});
131+
132+
it('put throws when revision hash is not 64 chars', async () => {
133+
mockWaitAsync.mockResolvedValue(0);
134+
mockCollectAsync.mockResolvedValue([
135+
{ tag: 'REVISION_COMMIT_REVISION', data: { revision: 'tooshort' } },
136+
]);
137+
138+
await expect(store.put(OWNER_DID, FILE_PATH, HINT)).rejects.toThrow(
139+
'did not return a valid revision hash',
140+
);
141+
});
142+
143+
it('put propagates errors so the upload route .catch() can handle them', async () => {
144+
mockWaitAsync.mockRejectedValue(new Error('Lore I/O error'));
145+
146+
await expect(store.put(OWNER_DID, FILE_PATH, HINT)).rejects.toThrow('Lore I/O error');
147+
});
148+
149+
it('serializes same-DID concurrent puts (queue concurrency=1)', async () => {
150+
const order: number[] = [];
151+
152+
// Each put takes a different amount of time to complete
153+
mockLore.repositoryCreate.mockReturnValue(mockHandle);
154+
mockWaitAsync.mockImplementation(() => Promise.resolve(0));
155+
156+
let call = 0;
157+
mockCollectAsync.mockImplementation(() => {
158+
const n = ++call;
159+
return new Promise(resolve =>
160+
setTimeout(() => {
161+
order.push(n);
162+
resolve([{ tag: 'REVISION_COMMIT_REVISION', data: { revision: FAKE_REV } }]);
163+
}, n === 1 ? 30 : 10) // first put is slower
164+
);
165+
});
166+
167+
// Fire both at the same time
168+
const [r1, r2] = await Promise.all([
169+
store.put(OWNER_DID, FILE_PATH, { ...HINT, assetId: 'asset_1' }),
170+
store.put(OWNER_DID, FILE_PATH, { ...HINT, assetId: 'asset_2' }),
171+
]);
172+
173+
// Both succeed
174+
expect(r1.loreRef).toBe(FAKE_REV);
175+
expect(r2.loreRef).toBe(FAKE_REV);
176+
// Serialized: 1 completes before 2 starts
177+
expect(order).toEqual([1, 2]);
178+
});
179+
180+
it('runs different-DID puts concurrently (independent queues)', async () => {
181+
const completed: string[] = [];
182+
183+
mockWaitAsync.mockResolvedValue(0);
184+
let call = 0;
185+
mockCollectAsync.mockImplementation(() => {
186+
const n = ++call;
187+
const did = n === 1 ? 'did:imajin:alice' : 'did:imajin:bob';
188+
return new Promise(resolve =>
189+
setTimeout(() => {
190+
completed.push(did);
191+
resolve([{ tag: 'REVISION_COMMIT_REVISION', data: { revision: FAKE_REV } }]);
192+
}, n === 1 ? 20 : 5)
193+
);
194+
});
195+
196+
await Promise.all([
197+
store.put('did:imajin:alice', FILE_PATH, HINT),
198+
store.put('did:imajin:bob', FILE_PATH, HINT),
199+
]);
200+
201+
// bob (faster) should finish before alice (slower) — they ran concurrently
202+
expect(completed[0]).toBe('did:imajin:bob');
203+
expect(completed[1]).toBe('did:imajin:alice');
204+
});
205+
206+
it('gc is a no-op and does not throw', async () => {
207+
await expect(store.gc(OWNER_DID, FAKE_REV)).resolves.toBeUndefined();
208+
});
209+
});

0 commit comments

Comments
 (0)