Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ COPY packages/webhooks/package.json ./packages/webhooks/package.json
COPY packages/fleet/package.json ./packages/fleet/package.json
COPY packages/providers/package.json ./packages/providers/package.json
COPY packages/runner-sdk/package.json ./packages/runner-sdk/package.json
COPY packages/billing/package.json ./packages/billing/package.json
COPY package*.json ./

# Install every dependencies
Expand Down
29 changes: 29 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions packages/billing/lib/billing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { uuidv7 } from 'uuidv7';

import { Err, Ok, flagHasUsage, report } from '@nangohq/utils';

import type { BillingClient, BillingIngestEvent, BillingMetric } from './types.js';
import type { Result } from '@nangohq/utils';

export class Billing {
constructor(private client: BillingClient) {
this.client = client;
}

async send(type: BillingMetric['type'], value: number, props: BillingMetric['properties']): Promise<Result<void>> {
return this.sendAll([{ type, value, properties: props }]);
}

async sendAll(events: BillingMetric[]): Promise<Result<void>> {
const mapped = events.flatMap((event) => {
if (event.value === 0) {
return [];
}

return [
{
type: event.type,
accountId: event.properties.accountId,
idempotencyKey: event.properties.idempotencyKey || uuidv7(),
timestamp: event.properties.timestamp || new Date(),
properties: {
count: event.value
}
}
];
});

return this.ingest(mapped);
}

// Note: Events are sent immediately
private async ingest(events: BillingIngestEvent[]): Promise<Result<void>> {
if (!flagHasUsage) {
return Ok(undefined);
}

try {
await this.client.ingest(events);
return Ok(undefined);
} catch (err: unknown) {
const e = new Error(`Failed to send billing event`, { cause: err });
report(e);
return Err(e);
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would retry certain errors here like network ones for instance? wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}
33 changes: 33 additions & 0 deletions packages/billing/lib/clients/lago.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Client as LagoClient, getLagoError } from 'lago-javascript-client';

import { envs } from '../envs.js';

import type { BillingClient, BillingIngestEvent } from '../types.js';
import type { EventInput } from 'lago-javascript-client';

const lagoClient = LagoClient(envs.LAGO_API_KEY || '');

export const lago: BillingClient = {
ingest: async (events: BillingIngestEvent[]): Promise<void> => {
const batchSize = 100;
for (let i = 0; i < events.length; i += batchSize) {
try {
await lagoClient.events.createBatchEvents({
events: events.slice(i, i + batchSize).map(toLagoEvent)
});
} catch (err) {
throw await getLagoError<typeof lagoClient.events.createBatchEvents>(err);
}
}
}
};

function toLagoEvent(event: BillingIngestEvent): EventInput['event'] {
return {
code: event.type,
transaction_id: event.idempotencyKey,
external_subscription_id: event.accountId.toString(),
timestamp: event.timestamp.getTime(),
properties: event.properties
};
}
3 changes: 3 additions & 0 deletions packages/billing/lib/envs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { ENVS, parseEnvs } from '@nangohq/utils';

export const envs = parseEnvs(ENVS);
6 changes: 6 additions & 0 deletions packages/billing/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Billing } from './billing.js';
import { lago } from './clients/lago.js';

export type { BillingIngestEvent, BillingMetric } from './types.js';

export const billing = new Billing(lago);
17 changes: 17 additions & 0 deletions packages/billing/lib/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export interface BillingClient {
ingest: (events: BillingIngestEvent[]) => Promise<void>;
}

export interface BillingIngestEvent {
type: 'monthly_active_records' | 'billable_connections' | 'billable_actions';
idempotencyKey: string;
accountId: number;
timestamp: Date;
properties: Record<string, string | number>;
}

export interface BillingMetric {
type: BillingIngestEvent['type'];
value: number;
properties: { accountId: number; timestamp?: Date | undefined; idempotencyKey?: string | undefined };
}
23 changes: 23 additions & 0 deletions packages/billing/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "@nangohq/billing",
"version": "1.0.0",
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.js",
"private": true,
"scripts": {},
"repository": {
"type": "git",
"url": "git+https://github.com/NangoHQ/nango.git",
"directory": "packages/billing"
},
"dependencies": {
"@nangohq/utils": "file:../utils",
"lago-javascript-client": "1.22.0",
"uuidv7": "1.0.2"
},
"devDependencies": {},
"files": [
"dist/**/*"
]
}
13 changes: 13 additions & 0 deletions packages/billing/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "lib",
"outDir": "dist"
},
"references": [
{
"path": "../utils"
}
],
"include": ["lib/**/*", "../utils/lib/vitest.d.ts"]
}
2 changes: 1 addition & 1 deletion packages/kvstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"repository": {
"type": "git",
"url": "git+https://github.com/NangoHQ/nango.git",
"directory": "packages/logs"
"directory": "packages/kvstore"
},
"dependencies": {
"@nangohq/utils": "file:../utils",
Expand Down
13 changes: 11 additions & 2 deletions packages/persist/lib/records.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import tracer from 'dd-trace';

import { billing } from '@nangohq/billing';
import { logContextGetter } from '@nangohq/logs';
import { format as recordsFormatter, records as recordsService } from '@nangohq/records';
import { ErrorSourceEnum, LogActionEnum, errorManager, getSyncConfigByJobId, updateSyncJobResult } from '@nangohq/shared';
import { Err, Ok, metrics, stringifyError } from '@nangohq/utils';

import type { FormattedRecord, UnencryptedRecordData, UpsertSummary } from '@nangohq/records';
import type { MergingStrategy } from '@nangohq/types';
import type { DBPlan, MergingStrategy } from '@nangohq/types';
import type { Result } from '@nangohq/utils';
import type { Span } from 'dd-trace';

Expand All @@ -18,6 +19,7 @@ export async function persistRecords({
accountId,
environmentId,
connectionId,
plan,
providerConfigKey,
nangoConnectionId,
syncId,
Expand All @@ -31,6 +33,7 @@ export async function persistRecords({
accountId: number;
environmentId: number;
connectionId: string;
plan: DBPlan | null;
providerConfigKey: string;
nangoConnectionId: number;
syncId: string;
Expand Down Expand Up @@ -151,7 +154,13 @@ export async function persistRecords({
return acc;
}, 0);

metrics.increment(metrics.Types.BILLED_RECORDS_COUNT, new Set(summary.billedKeys).size, { accountId });
const mar = new Set(summary.billedKeys).size;

if (plan && plan.name !== 'free') {
void billing.send('monthly_active_records', mar, { accountId });
}

metrics.increment(metrics.Types.BILLED_RECORDS_COUNT, mar, { accountId });
metrics.increment(metrics.Types.PERSIST_RECORDS_COUNT, records.length);
metrics.increment(metrics.Types.PERSIST_RECORDS_SIZE_IN_BYTES, recordsSizeInBytes, { accountId });
metrics.increment(metrics.Types.PERSIST_RECORDS_MODIFIED_COUNT, allModifiedKeys.size);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { ApiError, DeleteRecordsSuccess, Endpoint, MergingStrategy } from '@nangohq/types';
import { validateRequest } from '@nangohq/utils';
import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils';
import { persistRecords, recordsPath } from '../../../../../../../../../records.js';

import { recordsRequestParser } from './validate.js';
import { persistRecords, recordsPath } from '../../../../../../../../../records.js';

import type { AuthLocals } from '../../../../../../../../../middleware/auth.middleware.js';
import type { ApiError, DeleteRecordsSuccess, Endpoint, MergingStrategy } from '@nangohq/types';
import type { EndpointRequest, EndpointResponse, Route, RouteHandler } from '@nangohq/utils';

type DeleteRecords = Endpoint<{
Method: typeof method;
Expand Down Expand Up @@ -34,8 +36,9 @@ const validate = validateRequest<DeleteRecords>(recordsRequestParser);
const handler = async (req: EndpointRequest<DeleteRecords>, res: EndpointResponse<DeleteRecords, AuthLocals>) => {
const { environmentId, nangoConnectionId, syncId, syncJobId }: DeleteRecords['Params'] = req.params;
const { model, records, providerConfigKey, connectionId, activityLogId, merging }: DeleteRecords['Body'] = req.body;
const { account } = res.locals;
const { account, plan } = res.locals;
const result = await persistRecords({
plan,
persistType: 'delete',
environmentId,
accountId: account.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { ApiError, Endpoint, MergingStrategy, PostRecordsSuccess } from '@nangohq/types';
import { validateRequest } from '@nangohq/utils';
import type { EndpointRequest, EndpointResponse, RouteHandler } from '@nangohq/utils';
import { persistRecords, recordsPath } from '../../../../../../../../../records.js';

import { recordsRequestParser } from './validate.js';
import { persistRecords, recordsPath } from '../../../../../../../../../records.js';

import type { AuthLocals } from '../../../../../../../../../middleware/auth.middleware.js';
import type { ApiError, Endpoint, MergingStrategy, PostRecordsSuccess } from '@nangohq/types';
import type { EndpointRequest, EndpointResponse, RouteHandler } from '@nangohq/utils';

type PostRecords = Endpoint<{
Method: typeof method;
Expand Down Expand Up @@ -34,8 +36,9 @@ const validate = validateRequest<PostRecords>(recordsRequestParser);
const handler = async (req: EndpointRequest<PostRecords>, res: EndpointResponse<PostRecords, AuthLocals>) => {
const { environmentId, nangoConnectionId, syncId, syncJobId }: PostRecords['Params'] = req.params;
const { model, records, providerConfigKey, connectionId, activityLogId, merging }: PostRecords['Body'] = req.body;
const { account } = res.locals;
const { account, plan } = res.locals;
const result = await persistRecords({
plan,
persistType: 'save',
accountId: account.id,
environmentId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { ApiError, Endpoint, MergingStrategy, PutRecordsSuccess } from '@nangohq/types';
import { validateRequest } from '@nangohq/utils';
import type { EndpointRequest, EndpointResponse, RouteHandler } from '@nangohq/utils';
import { persistRecords, recordsPath } from '../../../../../../../../../records.js';

import { recordsRequestParser } from './validate.js';
import { persistRecords, recordsPath } from '../../../../../../../../../records.js';

import type { AuthLocals } from '../../../../../../../../../middleware/auth.middleware.js';
import type { ApiError, Endpoint, MergingStrategy, PutRecordsSuccess } from '@nangohq/types';
import type { EndpointRequest, EndpointResponse, RouteHandler } from '@nangohq/utils';

type PutRecords = Endpoint<{
Method: typeof method;
Expand Down Expand Up @@ -34,8 +36,9 @@ const validate = validateRequest<PutRecords>(recordsRequestParser);
const handler = async (req: EndpointRequest<PutRecords>, res: EndpointResponse<PutRecords, AuthLocals>) => {
const { environmentId, nangoConnectionId, syncId, syncJobId }: PutRecords['Params'] = req.params;
const { model, records, providerConfigKey, connectionId, activityLogId, merging }: PutRecords['Body'] = req.body;
const { account } = res.locals;
const { account, plan } = res.locals;
const result = await persistRecords({
plan,
persistType: 'update',
accountId: account.id,
environmentId,
Expand Down
1 change: 1 addition & 0 deletions packages/persist/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"@nangohq/records": "file:../records",
"@nangohq/shared": "file:../shared",
"@nangohq/utils": "file:../utils",
"@nangohq/billing": "file:../billing",
"dd-trace": "5.21.0",
"express": "^4.20.0",
"zod": "3.24.2"
Expand Down
3 changes: 3 additions & 0 deletions packages/persist/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
},
{
"path": "../records"
},
{
"path": "../billing"
}
],
"include": ["lib/**/*"]
Expand Down
Loading
Loading