Skip to content

Commit 1e180d0

Browse files
committed
Add support for async events export
1 parent a12abe4 commit 1e180d0

File tree

6 files changed

+270
-39
lines changed

6 files changed

+270
-39
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { getEvents } from "@/lib/analytics/get-events";
2+
import { EventsFilters } from "@/lib/analytics/types";
3+
4+
export async function* fetchEventsBatch(
5+
filters: Omit<EventsFilters, "page" | "limit">,
6+
pageSize: number = 1000,
7+
) {
8+
let page = 1;
9+
let hasMore = true;
10+
11+
while (hasMore) {
12+
const events = await getEvents({
13+
...filters,
14+
page,
15+
limit: pageSize,
16+
});
17+
18+
if (events.length > 0) {
19+
yield { events };
20+
page++;
21+
hasMore = events.length === pageSize;
22+
} else {
23+
hasMore = false;
24+
}
25+
}
26+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import {
2+
eventsExportColumnAccessors,
3+
eventsExportColumnNames,
4+
} from "@/lib/analytics/events-export-helpers";
5+
import { convertToCSV } from "@/lib/analytics/utils/convert-to-csv";
6+
import { createDownloadableExport } from "@/lib/api/create-downloadable-export";
7+
import { handleAndReturnErrorResponse } from "@/lib/api/errors";
8+
import { generateExportFilename } from "@/lib/api/utils/generate-export-filename";
9+
import { generateRandomString } from "@/lib/api/utils/generate-random-string";
10+
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
11+
import { eventsQuerySchema } from "@/lib/zod/schemas/analytics";
12+
import { sendEmail } from "@dub/email";
13+
import ExportReady from "@dub/email/templates/export-ready";
14+
import { prisma } from "@dub/prisma";
15+
import { capitalize, log } from "@dub/utils";
16+
import * as z from "zod/v4";
17+
import { logAndRespond } from "../../utils";
18+
import { fetchEventsBatch } from "./fetch-events-batch";
19+
20+
const payloadSchema = eventsQuerySchema.extend({
21+
columns: z
22+
.string()
23+
.transform((c) => c.split(","))
24+
.pipe(z.string().array()),
25+
workspaceId: z.string(),
26+
userId: z.string(),
27+
linkId: z.string().optional(),
28+
folderIds: z.array(z.string()).optional(),
29+
folderId: z.string().optional(),
30+
dataAvailableFrom: z.string().optional(),
31+
});
32+
33+
// POST /api/cron/events/export - QStash worker for processing large event exports
34+
export async function POST(req: Request) {
35+
try {
36+
const rawBody = await req.text();
37+
38+
await verifyQstashSignature({
39+
req,
40+
rawBody,
41+
});
42+
43+
const { columns, workspaceId, userId, ...filters } = payloadSchema.parse(
44+
JSON.parse(rawBody),
45+
);
46+
47+
const user = await prisma.user.findUnique({
48+
where: {
49+
id: userId,
50+
},
51+
select: {
52+
email: true,
53+
},
54+
});
55+
56+
if (!user) {
57+
return logAndRespond(`User ${userId} not found. Skipping the export.`);
58+
}
59+
60+
if (!user.email) {
61+
return logAndRespond(`User ${userId} has no email. Skipping the export.`);
62+
}
63+
64+
const workspace = await prisma.project.findUnique({
65+
where: {
66+
id: workspaceId,
67+
},
68+
select: {
69+
id: true,
70+
name: true,
71+
createdAt: true,
72+
},
73+
});
74+
75+
if (!workspace) {
76+
return logAndRespond(
77+
`Workspace ${workspaceId} not found. Skipping the export.`,
78+
);
79+
}
80+
81+
const { linkId, folderIds, folderId, dataAvailableFrom, ...eventFilters } =
82+
filters;
83+
84+
// Fetch events in batches and build CSV
85+
const allEvents: Record<string, any>[] = [];
86+
87+
const eventsFilters = {
88+
...eventFilters,
89+
...(linkId && { linkId }),
90+
workspaceId,
91+
folderIds,
92+
folderId: folderId || "",
93+
dataAvailableFrom: dataAvailableFrom
94+
? new Date(dataAvailableFrom)
95+
: workspace.createdAt,
96+
};
97+
98+
for await (const { events } of fetchEventsBatch(eventsFilters)) {
99+
const formattedEvents = events.map((row) =>
100+
Object.fromEntries(
101+
columns.map((c) => [
102+
eventsExportColumnNames?.[c] ?? capitalize(c),
103+
eventsExportColumnAccessors[c]?.(row) ?? row?.[c],
104+
]),
105+
),
106+
);
107+
allEvents.push(...formattedEvents);
108+
}
109+
110+
const csvData = convertToCSV(allEvents);
111+
112+
const { downloadUrl } = await createDownloadableExport({
113+
fileKey: `exports/events/${generateRandomString(16)}.csv`,
114+
fileName: generateExportFilename("events"),
115+
body: csvData,
116+
contentType: "text/csv",
117+
});
118+
119+
await sendEmail({
120+
to: user.email,
121+
subject: "Your events export is ready",
122+
react: ExportReady({
123+
email: user.email,
124+
exportType: "events",
125+
downloadUrl,
126+
workspace: {
127+
name: workspace.name,
128+
},
129+
}),
130+
});
131+
132+
return logAndRespond(
133+
`Export (${allEvents.length} events) generated and email sent to user.`,
134+
);
135+
} catch (error) {
136+
await log({
137+
message: `Error exporting events: ${error.message}`,
138+
type: "cron",
139+
});
140+
141+
return handleAndReturnErrorResponse(error);
142+
}
143+
}

apps/web/app/(ee)/api/events/export/route.ts

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import {
2+
eventsExportColumnAccessors,
3+
eventsExportColumnNames,
4+
} from "@/lib/analytics/events-export-helpers";
5+
import { getAnalytics } from "@/lib/analytics/get-analytics";
16
import { getEvents } from "@/lib/analytics/get-events";
27
import { getFolderIdsToFilter } from "@/lib/analytics/get-folder-ids-to-filter";
38
import { convertToCSV } from "@/lib/analytics/utils";
@@ -6,41 +11,14 @@ import { getLinkOrThrow } from "@/lib/api/links/get-link-or-throw";
611
import { throwIfClicksUsageExceeded } from "@/lib/api/links/usage-checks";
712
import { assertValidDateRangeForPlan } from "@/lib/api/utils/assert-valid-date-range-for-plan";
813
import { withWorkspace } from "@/lib/auth";
14+
import { qstash } from "@/lib/cron";
915
import { verifyFolderAccess } from "@/lib/folder/permissions";
10-
import { ClickEvent, LeadEvent, SaleEvent } from "@/lib/types";
1116
import { eventsQuerySchema } from "@/lib/zod/schemas/analytics";
12-
import { COUNTRIES, capitalize } from "@dub/utils";
17+
import { APP_DOMAIN_WITH_NGROK, capitalize } from "@dub/utils";
18+
import { NextResponse } from "next/server";
1319
import * as z from "zod/v4";
1420

15-
type Row = ClickEvent | LeadEvent | SaleEvent;
16-
17-
const columnNames: Record<string, string> = {
18-
trigger: "Event",
19-
url: "Destination URL",
20-
os: "OS",
21-
referer: "Referrer",
22-
refererUrl: "Referrer URL",
23-
timestamp: "Date",
24-
invoiceId: "Invoice ID",
25-
saleAmount: "Sale Amount",
26-
clickId: "Click ID",
27-
};
28-
29-
const columnAccessors = {
30-
trigger: (r: Row) => r.click.trigger,
31-
event: (r: LeadEvent | SaleEvent) => r.eventName,
32-
url: (r: ClickEvent) => r.click.url,
33-
link: (r: Row) => r.domain + (r.key === "_root" ? "" : `/${r.key}`),
34-
country: (r: Row) =>
35-
r.country ? COUNTRIES[r.country] ?? r.country : r.country,
36-
referer: (r: ClickEvent) => r.click.referer,
37-
refererUrl: (r: ClickEvent) => r.click.refererUrl,
38-
customer: (r: LeadEvent | SaleEvent) =>
39-
r.customer.name + (r.customer.email ? ` <${r.customer.email}>` : ""),
40-
invoiceId: (r: SaleEvent) => r.sale.invoiceId,
41-
saleAmount: (r: SaleEvent) => "$" + (r.sale.amount / 100).toFixed(2),
42-
clickId: (r: ClickEvent) => r.click.id,
43-
};
21+
const MAX_EVENTS_TO_EXPORT = 1000;
4422

4523
// GET /api/events/export – get export data for analytics
4624
export const GET = withWorkspace(
@@ -93,20 +71,57 @@ export const GET = withWorkspace(
9371
userId: session.user.id,
9472
});
9573

74+
// Count events using getAnalytics with groupBy: "count"
75+
const countResponse = await getAnalytics({
76+
...parsedParams,
77+
groupBy: "count",
78+
...(link && { linkId: link.id }),
79+
folderIds,
80+
workspaceId: workspace.id,
81+
dataAvailableFrom: workspace.createdAt,
82+
});
83+
84+
// Extract the count based on event type
85+
// getAnalytics with groupBy: "count" returns an object like { clicks: 123 } or { leads: 45 } or { sales: 10, saleAmount: 5000 }
86+
const eventsCount =
87+
typeof countResponse === "object" && countResponse !== null
88+
? (countResponse[event as keyof typeof countResponse] as number) ?? 0
89+
: typeof countResponse === "number"
90+
? countResponse
91+
: 0;
92+
93+
// Process the export in the background if the number of events is greater than MAX_EVENTS_TO_EXPORT
94+
if (eventsCount > MAX_EVENTS_TO_EXPORT) {
95+
await qstash.publishJSON({
96+
url: `${APP_DOMAIN_WITH_NGROK}/api/cron/events/export`,
97+
body: {
98+
...searchParams,
99+
workspaceId: workspace.id,
100+
userId: session.user.id,
101+
...(link && { linkId: link.id }),
102+
folderIds: folderIds ? folderIds : undefined,
103+
folderId: folderId || "",
104+
dataAvailableFrom: workspace.createdAt.toISOString(),
105+
},
106+
});
107+
108+
return NextResponse.json({}, { status: 202 });
109+
}
110+
96111
const response = await getEvents({
97112
...parsedParams,
98113
...(link && { linkId: link.id }),
99114
workspaceId: workspace.id,
100-
limit: 100000,
115+
limit: MAX_EVENTS_TO_EXPORT,
101116
folderIds,
102117
folderId: folderId || "",
103118
});
104119

105120
const data = response.map((row) =>
106121
Object.fromEntries(
107122
columns.map((c) => [
108-
columnNames?.[c] ?? capitalize(c),
109-
columnAccessors[c]?.(row) ?? row?.[c],
123+
eventsExportColumnNames?.[c] ?? capitalize(c),
124+
eventsExportColumnAccessors[c]?.(row) ?? row?.[c],
110125
]),
111126
),
112127
);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { ClickEvent, LeadEvent, SaleEvent } from "@/lib/types";
2+
import { COUNTRIES } from "@dub/utils";
3+
4+
export type Row = ClickEvent | LeadEvent | SaleEvent;
5+
6+
export const eventsExportColumnNames: Record<string, string> = {
7+
trigger: "Event",
8+
url: "Destination URL",
9+
os: "OS",
10+
referer: "Referrer",
11+
refererUrl: "Referrer URL",
12+
timestamp: "Date",
13+
invoiceId: "Invoice ID",
14+
saleAmount: "Sale Amount",
15+
clickId: "Click ID",
16+
};
17+
18+
export const eventsExportColumnAccessors = {
19+
trigger: (r: Row) => r.click.trigger,
20+
event: (r: LeadEvent | SaleEvent) => r.eventName,
21+
url: (r: ClickEvent) => r.click.url,
22+
link: (r: Row) => r.domain + (r.key === "_root" ? "" : `/${r.key}`),
23+
country: (r: Row) =>
24+
r.country ? COUNTRIES[r.country] ?? r.country : r.country,
25+
referer: (r: ClickEvent) => r.click.referer,
26+
refererUrl: (r: ClickEvent) => r.click.refererUrl,
27+
customer: (r: LeadEvent | SaleEvent) =>
28+
r.customer.name + (r.customer.email ? ` <${r.customer.email}>` : ""),
29+
invoiceId: (r: SaleEvent) => r.sale.invoiceId,
30+
saleAmount: (r: SaleEvent) => "$" + (r.sale.amount / 100).toFixed(2),
31+
clickId: (r: ClickEvent) => r.click.id,
32+
};

apps/web/ui/analytics/events/events-export-button.tsx

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import useWorkspace from "@/lib/swr/use-workspace";
22
import { Button, Download, TooltipContent } from "@dub/ui";
3+
import { useSession } from "next-auth/react";
34
import { Dispatch, SetStateAction, useContext } from "react";
45
import { toast } from "sonner";
56
import { AnalyticsContext } from "../analytics-provider";
@@ -13,6 +14,7 @@ export function EventsExportButton({
1314
const { exportQueryString } = useContext(EventsContext);
1415
const { eventsApiPath } = useContext(AnalyticsContext);
1516
const { slug, plan } = useWorkspace();
17+
const { data: session } = useSession();
1618

1719
const needsHigherPlan = plan === "free" || plan === "pro";
1820

@@ -31,12 +33,25 @@ export function EventsExportButton({
3133
throw new Error(response.statusText);
3234
}
3335

36+
if (response.status === 202) {
37+
setOpenPopover(false);
38+
return {
39+
isAsync: true,
40+
message: `Your export is being processed and we'll send you an email (${session?.user?.email}) when it's ready to download.`,
41+
};
42+
}
43+
3444
const blob = await response.blob();
3545
const url = window.URL.createObjectURL(blob);
3646
const a = document.createElement("a");
3747
a.href = url;
3848
a.download = `Dub Events Export - ${new Date().toISOString()}.csv`;
3949
a.click();
50+
setOpenPopover(false);
51+
return {
52+
isAsync: false,
53+
message: "Exported successfully",
54+
};
4055
}
4156

4257
return (
@@ -57,10 +72,9 @@ export function EventsExportButton({
5772
onClick={() => {
5873
toast.promise(exportData(), {
5974
loading: "Exporting file...",
60-
success: "Exported successfully",
75+
success: (data) => data.message,
6176
error: (error) => error,
6277
});
63-
setOpenPopover(false);
6478
}}
6579
/>
6680
);

packages/email/src/templates/export-ready.tsx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export default function ExportReady({
2424
}: {
2525
email: string;
2626
downloadUrl: string;
27-
exportType: "partners" | "commissions" | "links";
27+
exportType: "partners" | "commissions" | "links" | "events";
2828
expiresInDays?: number;
2929
program?: {
3030
name: string;
@@ -48,8 +48,9 @@ export default function ExportReady({
4848
Your {exportType} export is ready
4949
</Heading>
5050
<Text className="text-sm leading-6 text-black">
51-
Your export of {exportType} from <strong>{contextName}</strong>{" "}
52-
has been completed and is ready to download.
51+
Your {exportType} export from your <strong>{contextName}</strong>{" "}
52+
{program ? "program" : "workspace"} has been completed and is
53+
ready to download.
5354
</Text>
5455
<Section className="my-8">
5556
<Link

0 commit comments

Comments
 (0)