Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions apps/web/src/app/api/ai/journl-agent/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,6 @@ async function handler(req: Request) {

const result = await journlAgent.streamVNext(messages, {
format: "aisdk",
onFinish: async (result) => {
const modelData = await journlAgent.getModel();

const provider = modelData.provider;
const model = modelData.modelId;

if (result.usage && session.user?.id) {
await api.usage.trackModelUsage({
metrics: [
{
quantity: result.usage.promptTokens,
unit: "input_tokens",
},
{
quantity: result.usage.completionTokens,
unit: "output_tokens",
},
],
model_id: model,
model_provider: provider,
user_id: session.user.id,
});
}
},
runtimeContext: setJournlRuntimeContext({
...rest.context,
user: {
Expand All @@ -51,6 +27,38 @@ async function handler(req: Request) {
} satisfies JournlAgentContext),
});

// Track usage after streaming completes.
if (session.user?.id) {
const fullOutput = await result.getFullOutput();
const usage = fullOutput.usage;

if (usage) {
const modelData = await journlAgent.getModel();
const provider = modelData.provider;
const model = modelData.modelId;

await api.usage.trackModelUsage({
metrics: [
{
quantity: usage.promptTokens || 0,
unit: "input_tokens",
},
{
quantity: usage.completionTokens || 0,
unit: "output_tokens",
},
{
quantity: usage.reasoningTokens || 0,
unit: "reasoning_tokens",
},
],
model_id: model,
model_provider: provider,
user_id: session.user.id,
});
}
}

return result.toUIMessageStreamResponse();
} catch (error) {
console.error("[api.chat.route] error 👀", error);
Expand Down
39 changes: 39 additions & 0 deletions apps/web/src/app/api/supabase/usage/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { zUsageEventWebhook } from "@acme/db/schema";
import { NextResponse } from "next/server";
import { api } from "~/trpc/server";
import { handler } from "../_lib/webhook-handler";

/**
* This webhook processes usage events when they are created or updated.
* Usage events are created when AI features are used (chat, embedding, etc.)
*/
export const POST = handler(zUsageEventWebhook, async (payload) => {
// Skip DELETE events
if (payload.type === "DELETE") {
return NextResponse.json({ success: true });
}

// Skip processing if the event is already processed
if (payload.record?.status === "processed") {
return NextResponse.json({ success: true });
}

try {
// Process the usage event with the usage period
const result = await api.usage.processUsageEvent({
usage_event_id: payload.record.id,
user_id: payload.record.user_id,
});

return NextResponse.json({ result, success: true });
} catch (error) {
return NextResponse.json(
{
details: error instanceof Error ? error.message : "Unknown error",
error: "Processing failed",
success: false,
},
{ status: 500 },
);
}
});
2 changes: 2 additions & 0 deletions packages/api/src/api-router/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { createTRPCRouter } from "../trpc.js";
import { authRouter } from "./auth.js";
import { documentRouter } from "./document.js";
import { journalRouter } from "./journal.js";
import { modelPricingRouter } from "./model-pricing.js";
import { notesRouter } from "./notes.js";
import { pagesRouter } from "./pages.js";
import { subscriptionRouter } from "./subscription.js";
Expand All @@ -11,6 +12,7 @@ export const apiRouter = createTRPCRouter({
auth: authRouter,
document: documentRouter,
journal: journalRouter,
modelPricing: modelPricingRouter,
notes: notesRouter,
pages: pagesRouter,
subscription: subscriptionRouter,
Expand Down
101 changes: 101 additions & 0 deletions packages/api/src/api-router/model-pricing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { and, desc, eq, lte } from "@acme/db";
import { ModelPricing, zInsertModelPricing } from "@acme/db/schema";
import { TRPCError, type TRPCRouterRecord } from "@trpc/server";
import { z } from "zod/v4";
import { publicProcedure } from "../trpc.js";

export const modelPricingRouter = {
getAllPricingForModel: publicProcedure
.input(
z.object({
model_id: z.string(),
model_provider: z.string(),
}),
)
.query(async ({ ctx, input }) => {
try {
const pricing = await ctx.db.query.ModelPricing.findMany({
orderBy: [desc(ModelPricing.effective_date)],
where: and(
eq(ModelPricing.model_id, input.model_id),
eq(ModelPricing.model_provider, input.model_provider),
lte(ModelPricing.effective_date, new Date()),
),
});

return pricing;
} catch (error) {
console.error(
"Database error in modelPricing.getAllPricingForModel:",
error,
);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Failed to get model pricing",
});
}
}),
getCurrentPricing: publicProcedure
.input(
z.object({
model_id: z.string(),
model_provider: z.string(),
unit_type: z.string(),
}),
)
.query(async ({ ctx, input }) => {
try {
const pricing = await ctx.db.query.ModelPricing.findFirst({
orderBy: [desc(ModelPricing.effective_date)],
where: and(
eq(ModelPricing.model_id, input.model_id),
eq(ModelPricing.model_provider, input.model_provider),
eq(ModelPricing.unit_type, input.unit_type),
lte(ModelPricing.effective_date, new Date()),
),
});

return pricing;
} catch (error) {
console.error(
"Database error in modelPricing.getCurrentPricing:",
error,
);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Failed to get current pricing",
});
}
}),

upsertPricing: publicProcedure
.input(zInsertModelPricing)
.mutation(async ({ ctx, input }) => {
try {
const [pricing] = await ctx.db
.insert(ModelPricing)
.values(input)
.onConflictDoUpdate({
set: {
price_per_unit: input.price_per_unit,
updated_at: new Date(),
},
target: [
ModelPricing.model_id,
ModelPricing.model_provider,
ModelPricing.unit_type,
ModelPricing.effective_date,
],
})
.returning();

return pricing;
} catch (error) {
console.error("Database error in modelPricing.upsertPricing:", error);
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: "Failed to upsert pricing",
});
}
}),
} satisfies TRPCRouterRecord;
75 changes: 46 additions & 29 deletions packages/api/src/api-router/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { and, eq, or } from "@acme/db";
import { Subscription } from "@acme/db/schema";
import type { TRPCRouterRecord } from "@trpc/server";
import { z } from "zod/v4";
import type { TRPCContext } from "../trpc";
import { protectedProcedure } from "../trpc";

/**
Expand All @@ -18,6 +19,50 @@ function getAuthHeaders(headers: Headers): Headers {
return authHeaders;
}

/**
* Get the free plan from the database
*/
export async function getFreePlan(ctx: TRPCContext) {
return ctx.db.query.Plan.findFirst({
where: (plans, { eq, and }) =>
and(eq(plans.active, true), eq(plans.name, "free")),
});
}

/**
* Get the active subscription for a user
*/
export async function getActiveSubscription({
ctx,
userId,
}: {
ctx: TRPCContext;
userId?: string;
}) {
const userIdToUse = userId ?? ctx.session?.user?.id;

if (!userIdToUse) {
return null;
}

return ctx.db.query.Subscription.findFirst({
where: and(
eq(Subscription.referenceId, userIdToUse),
or(
eq(Subscription.status, "active"),
eq(Subscription.status, "trialing"),
),
),
with: {
plan: {
with: {
price: true,
},
},
},
});
}

const zProPlan = z.object({
description: z.string().nullable(),
displayName: z.string(),
Expand Down Expand Up @@ -66,36 +111,8 @@ export const subscriptionRouter = {
quota: plan.quota,
});
}),
// getActivePlan: protectedProcedure.query(async ({ ctx }) => {
// const activeSubscription = await getActiveSubscription({ ctx });
// if (!activeSubscription?.priceId) return null;

// const plan = await ctx.db.query.Plan.findFirst({
// where: eq(Plan.id, activeSubscription.priceId),
// with: {
// prices: true,
// },
// });

// return plan;
// }),
getSubscription: protectedProcedure.query(async ({ ctx }) => {
const subscription = await ctx.db.query.Subscription.findFirst({
where: and(
eq(Subscription.referenceId, ctx.session.user.id),
or(
eq(Subscription.status, "active"),
eq(Subscription.status, "trialing"),
),
),
with: {
plan: {
with: {
price: true,
},
},
},
});
const subscription = await getActiveSubscription({ ctx });

if (!subscription?.plan) {
return null;
Expand Down
Loading