Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion apps/web/src/app/api/ai/journl-agent/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { journlAgent } from "~/ai/mastra/agents/journl-agent";
import { getSession } from "~/auth/server";
import { api } from "~/trpc/server";

export const maxDuration = 30; // Allow streaming responses up to 30 seconds

Expand All @@ -13,7 +14,32 @@ export async function POST(req: Request) {
return new Response("Unauthorized", { status: 401 });
}

const result = await journlAgent.stream(messages);
const result = await journlAgent.stream(messages, {
onFinish: async (result) => {
const modelData = await journlAgent.getModel();

const provider = modelData.provider;
const model = modelData.modelId;

if (result.usage && session.user?.id) {
await api.usage.trackAiModelUsage({
metrics: [
{
quantity: result.usage.promptTokens,
unit: "input_tokens",
},
{
quantity: result.usage.completionTokens,
unit: "output_tokens",
},
],
model_id: model,
model_provider: provider,
user_id: session.user.id,
});
}
},
});

return result.toDataStreamResponse({
getErrorMessage(error) {
Expand Down
17 changes: 13 additions & 4 deletions apps/web/src/app/api/supabase/embed-document/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import removeMarkdown from "remove-markdown";
import type { z } from "zod/v4";
import { model } from "~/ai/providers/openai/embedding";
import { schema } from "~/components/editor/block-schema";
import { embedder } from "~/trpc/server";
import { api, embedder } from "~/trpc/server";
import { handler } from "../_lib/webhook-handler";

const CHUNK_PARAMS: ChunkParams = {
Expand Down Expand Up @@ -102,14 +102,23 @@ export const POST = handler(zDocumentEmbeddingTask, async (payload) => {
const mDocument =
await MDocument.fromMarkdown(markdown).chunk(CHUNK_PARAMS);

// ! TODO: Here's where we get the usage tokens, we must process these in a different transaction
// ! To guarantee that the usage is tracked regardless of the success of the embedding.
const { embeddings, usage: _usage } = await embedMany({
const { embeddings, usage } = await embedMany({
maxRetries: 5,
model,
values: mDocument.map((chunk) => chunk.text),
});

await api.usage.trackAiModelUsage({
metadata: {
document_id: document.id,
model_version: model.specificationVersion,
},
metrics: [{ quantity: usage.tokens, unit: "tokens" }],
model_id: model.modelId,
model_provider: model.provider,
user_id: document.user_id,
});

const insertions: z.infer<typeof zInsertDocumentEmbedding>[] = [];

for (const [index, embedding] of embeddings.entries()) {
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/api-router/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { documentRouter } from "./document.js";
import { journalRouter } from "./journal.js";
import { notesRouter } from "./notes.js";
import { pagesRouter } from "./pages.js";
import { usageRouter } from "./usage.js";

export const apiRouter = createTRPCRouter({
auth: authRouter,
Expand All @@ -13,6 +14,7 @@ export const apiRouter = createTRPCRouter({
journal: journalRouter,
notes: notesRouter,
pages: pagesRouter,
usage: usageRouter,
});

export type ApiRouter = typeof apiRouter;
52 changes: 52 additions & 0 deletions packages/api/src/api-router/usage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { eq } from "@acme/db";
import { UsageEvent, UsageEventStatus } from "@acme/db/schema";
import { TRPCError, type TRPCRouterRecord } from "@trpc/server";
import { z } from "zod/v4";
import { publicProcedure } from "../trpc.js";

export const usageRouter = {
trackAiModelUsage: publicProcedure
.input(
z.object({
metadata: z.record(z.string(), z.string()).optional(),
metrics: z.array(z.object({ quantity: z.number(), unit: z.string() })),
model_id: z.string(),
model_provider: z.string(),
user_id: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
try {
return await ctx.db.insert(UsageEvent).values(input);
} catch (error) {
console.error("Database error in usage.createUsageEvent:", error);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Failed to create usage event",
});
}
}),
updateUsageEventStatus: publicProcedure
.input(
z.object({
id: z.string(),
status: z.enum(UsageEventStatus.enumValues),
}),
)
.mutation(async ({ ctx, input }) => {
try {
return await ctx.db
.update(UsageEvent)
.set({
status: input.status,
})
.where(eq(UsageEvent.id, input.id));
} catch (error) {
console.error("Database error in usage.processUsageEvent:", error);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Failed to process usage event",
});
}
}),
} satisfies TRPCRouterRecord;
1 change: 1 addition & 0 deletions packages/db/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from "./core/document-embedding.schema.js";
export * from "./core/document-embedding-task.schema.js";
export * from "./core/journal-entry.schema.js";
export * from "./core/page.schema.js";
export * from "./usage/usage-event.schema.js";
31 changes: 31 additions & 0 deletions packages/db/src/usage/usage-event.schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { jsonb, pgEnum, pgTable, text, timestamp } from "drizzle-orm/pg-core";
import { user } from "../schema.js";

type UsageEventMetrics = {
unit: string;
quantity: number;
};

export const UsageEventStatus = pgEnum("usage_event_status", [
"pending",
"processed",
"failed",
]);

export const UsageEvent = pgTable("usage_event", (t) => ({
id: t.uuid().notNull().primaryKey().defaultRandom(),
created_at: timestamp().defaultNow(),
updated_at: timestamp()
.defaultNow()
.$onUpdateFn(() => new Date()),
user_id: text()
.notNull()
.references(() => user.id),
model_id: text().notNull(),
model_provider: text().notNull(),
metadata: jsonb(),
metrics: jsonb().$type<UsageEventMetrics[]>().notNull(),
status: UsageEventStatus().notNull().default("pending"),
}));

export type UsageEvent = typeof UsageEvent.$inferSelect;