Skip to content

Commit 4f57a6d

Browse files
authored
fix: handle big payload across the stack (NangoHQ#2474)
## Describe your changes Fixes https://linear.app/nango/issue/NAN-1363/big-payload-is-crashing-the-action-execution A big output from an action would break logging, orchestrator and the UI (all for different reasons). The error management being still not good it's messy to handle and propagate the correct error but I think I managed something not too ugly. - Runner: Handle big payload validation - Runner: Better handle JSON.stringification when logging and avoid error by truncating before sending - Orch: stop stringifying errors - Orch: return json response in the payload as json - Orch: handle big payload (> 10mb) preventing task to be marked as completed - UI: handle big payload in the message panel ## Test ```ts import type { NangoSync } from '../../models'; export default async function runAction(nango: NangoSync): Promise<string> { return '1'.repeat(1_999_999_999); } ``` <img width="1173" alt="Screenshot 2024-07-05 at 18 37 41" src="https://github.com/NangoHQ/nango/assets/1637651/c077c47b-edd7-4f02-ab46-988b8fc04616">
1 parent 40c90f0 commit 4f57a6d

File tree

12 files changed

+134
-67
lines changed

12 files changed

+134
-67
lines changed

package-lock.json

+6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/orchestrator/lib/clients/client.ts

+23-23
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { route as getOutputRoute } from '../routes/v1/tasks/taskId/getOutput.js'
99
import { route as putTaskRoute } from '../routes/v1/tasks/putTaskId.js';
1010
import { route as postHeartbeatRoute } from '../routes/v1/tasks/taskId/postHeartbeat.js';
1111
import type { Result, Route } from '@nangohq/utils';
12-
import { Ok, Err, routeFetch, stringifyError, getLogger } from '@nangohq/utils';
12+
import { Ok, Err, routeFetch, getLogger } from '@nangohq/utils';
1313
import type { Endpoint } from '@nangohq/types';
1414
import type {
1515
ClientError,
@@ -54,8 +54,8 @@ export class OrchestratorClient {
5454
if ('error' in res) {
5555
return Err({
5656
name: res.error.code,
57-
message: res.error.message || `Error scheduling immediate task`,
58-
payload: props
57+
message: res.error.message || `Error scheduling immediate task`,
58+
payload: { ...props, response: res.error.payload as any }
5959
});
6060
} else {
6161
return Ok(res);
@@ -80,7 +80,7 @@ export class OrchestratorClient {
8080
return Err({
8181
name: res.error.code,
8282
message: res.error.message || `Error creating recurring schedule`,
83-
payload: { ...props, startsAt }
83+
payload: { ...props, startsAt, response: res.error.payload as any }
8484
});
8585
} else {
8686
return Ok(res);
@@ -107,7 +107,7 @@ export class OrchestratorClient {
107107
return Err({
108108
name: res.error.code,
109109
message: res.error.message || `Error setting schedule state`,
110-
payload: { scheduleName, state }
110+
payload: { scheduleName, state, response: res.error.payload as any }
111111
});
112112
} else {
113113
return Ok(undefined);
@@ -122,7 +122,7 @@ export class OrchestratorClient {
122122
return Err({
123123
name: res.error.code,
124124
message: res.error.message || `Error updating schedule frequency`,
125-
payload: { scheduleName, frequencyMs }
125+
payload: { scheduleName, frequencyMs, response: res.error.payload as any }
126126
});
127127
} else {
128128
return Ok(undefined);
@@ -139,7 +139,7 @@ export class OrchestratorClient {
139139
return Err({
140140
name: res.error.code,
141141
message: res.error.message || `Error creating recurring schedule`,
142-
payload: props
142+
payload: { ...props, response: res.error.payload as any }
143143
});
144144
} else {
145145
return Ok(undefined);
@@ -162,7 +162,7 @@ export class OrchestratorClient {
162162
return Err({
163163
name: getOutput.error.code,
164164
message: getOutput.error.message || `Error fetching task '${taskId}' output`,
165-
payload: {}
165+
payload: { response: getOutput.error.payload as any }
166166
});
167167
} else {
168168
switch (getOutput.state) {
@@ -261,7 +261,7 @@ export class OrchestratorClient {
261261
return Err({
262262
name: res.error.code,
263263
message: res.error.message || `Error listing tasks`,
264-
payload: body
264+
payload: { ...body, response: res.error.payload as any }
265265
});
266266
} else {
267267
const tasks = res.flatMap((task) => {
@@ -284,7 +284,7 @@ export class OrchestratorClient {
284284
return Err({
285285
name: res.error.code,
286286
message: res.error.message || `Error listing schedules`,
287-
payload: { scheduleNames }
287+
payload: { scheduleNames, response: res.error.payload as any }
288288
});
289289
} else {
290290
const schedules = res.flatMap((schedule) => {
@@ -319,7 +319,7 @@ export class OrchestratorClient {
319319
return Err({
320320
name: res.error.code,
321321
message: res.error.message || `Error dequeueing tasks`,
322-
payload: { groupKey, limit }
322+
payload: { groupKey, limit, response: res.error.payload as any }
323323
});
324324
} else {
325325
const dequeuedTasks = res.flatMap((task) => {
@@ -342,7 +342,7 @@ export class OrchestratorClient {
342342
return Err({
343343
name: res.error.code,
344344
message: res.error.message || `Error heartbeating task '${taskId}'`,
345-
payload: { taskId }
345+
payload: { taskId, response: res.error.payload as any }
346346
});
347347
} else {
348348
return Ok(undefined);
@@ -358,21 +358,21 @@ export class OrchestratorClient {
358358
return Err({
359359
name: res.error.code,
360360
message: res.error.message || `Error succeeding task '${taskId}'`,
361-
payload: { taskId, output }
361+
payload: { taskId, output, response: res.error.payload as any }
362362
});
363363
} else {
364364
return validateTask(res).mapError((err) => ({
365365
name: 'succeed_failed',
366-
message: `Failed to mark task ${taskId} as succeeded: ${stringifyError(err)}`,
367-
payload: { taskId, output }
366+
message: `Failed to mark task ${taskId} as succeeded`,
367+
payload: { taskId, output, err: err as any }
368368
}));
369369
}
370370
}
371371

372372
public async failed({ taskId, error }: { taskId: string; error: Error }): Promise<Result<OrchestratorTask, ClientError>> {
373373
const output = {
374374
name: error.name,
375-
type: 'type' in error ? (error.type as string) : null,
375+
type: 'type' in error ? (error.type as string) : 'unknown_error',
376376
message: error.message,
377377
payload: 'payload' in error ? (error.payload as any) : null
378378
};
@@ -384,13 +384,13 @@ export class OrchestratorClient {
384384
return Err({
385385
name: res.error.code,
386386
message: res.error.message || `Error failing task '${taskId}'`,
387-
payload: { taskId, error: output }
387+
payload: { taskId, error: output, response: res.error.payload as any }
388388
});
389389
} else {
390390
return validateTask(res).mapError((err) => ({
391391
name: 'failed_failed',
392-
message: `Failed to mark task ${taskId} as failed: ${stringifyError(err)}`,
393-
payload: { taskId, error: output }
392+
message: `Failed to mark task ${taskId} as failed`,
393+
payload: { taskId, error: output, err: err as any }
394394
}));
395395
}
396396
}
@@ -404,13 +404,13 @@ export class OrchestratorClient {
404404
return Err({
405405
name: res.error.code,
406406
message: res.error.message || `Error cancelling task '${taskId}'`,
407-
payload: { taskId, error: reason }
407+
payload: { taskId, error: reason, response: res.error.payload as any }
408408
});
409409
} else {
410410
return validateTask(res).mapError((err) => ({
411-
name: 'cacel_failed',
412-
message: `Failed to mark task ${taskId} as cancelled: ${stringifyError(err)}`,
413-
payload: { taskId, error: reason }
411+
name: 'cancel_failed',
412+
message: `Failed to mark task ${taskId} as cancelled`,
413+
payload: { taskId, error: reason, err: err as any }
414414
}));
415415
}
416416
}

packages/orchestrator/lib/clients/processor.ts

+26-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import type { Result } from '@nangohq/utils';
22
import { Err, stringifyError, getLogger } from '@nangohq/utils';
33
import type { OrchestratorClient } from './client.js';
4-
import type { OrchestratorTask } from './types.js';
4+
import type { ClientError, OrchestratorTask } from './types.js';
55
import type { JsonValue } from 'type-fest';
66
import PQueue from 'p-queue';
77
import type { Tracer } from 'dd-trace';
8+
import type { ApiError } from '@nangohq/types';
89

910
const logger = getLogger('orchestrator.clients.processor');
1011

@@ -121,26 +122,29 @@ export class OrchestratorProcessor {
121122
// task was aborted. No need to set it as failed
122123
return;
123124
}
125+
124126
const setFailed = await this.orchestratorClient.failed({ taskId: task.id, error: res.error });
125127
if (setFailed.isErr()) {
126-
logger.error(`failed to set task ${task.id} as failed: ${stringifyError(setFailed.error)}`);
128+
throwIfPayloadTooBig(setFailed.error);
129+
logger.error(`failed to set task ${task.id} as failed`, setFailed.error);
127130
span.setTag('error', setFailed);
128131
} else {
129132
span.setTag('error', res.error);
130133
}
131134
} else {
132135
const setSucceed = await this.orchestratorClient.succeed({ taskId: task.id, output: res.value });
133136
if (setSucceed.isErr()) {
134-
logger.error(`failed to set task ${task.id} as succeeded: ${stringifyError(setSucceed.error)}`);
137+
throwIfPayloadTooBig(setSucceed.error);
138+
logger.error(`failed to set task ${task.id} as succeeded`, setSucceed.error);
135139
span.setTag('error', setSucceed);
136140
}
137141
}
138142
} catch (err: unknown) {
139-
const error = new Error(stringifyError(err));
140-
logger.error(`Failed to process task ${task.id}: ${stringifyError(error)}`);
143+
const error = err instanceof Error ? err : new Error(stringifyError(err));
144+
logger.error(`Failed to process task ${task.id}`, error);
141145
const setFailed = await this.orchestratorClient.failed({ taskId: task.id, error });
142146
if (setFailed.isErr()) {
143-
logger.error(`failed to set task ${task.id} as failed. Unknown error: ${stringifyError(setFailed.error)}`);
147+
logger.error(`failed to set task ${task.id} as failed. Unknown error`, setFailed.error);
144148
span.setTag('error', setFailed);
145149
} else {
146150
span.setTag('error', error);
@@ -157,8 +161,23 @@ export class OrchestratorProcessor {
157161
return setInterval(async () => {
158162
const res = await this.orchestratorClient.heartbeat({ taskId: task.id });
159163
if (res.isErr()) {
160-
logger.error(`failed to send heartbeat for task ${task.id}: ${stringifyError(res.error)}`);
164+
logger.error(`failed to send heartbeat for task ${task.id}`, res.error);
161165
}
162166
}, 300_000);
163167
}
164168
}
169+
170+
// We don't have access to NangoError so we have to create a temp error
171+
class PayloadTooBigError extends Error {
172+
type = 'action_output_too_big';
173+
override message = 'Output is too big';
174+
}
175+
176+
function throwIfPayloadTooBig(err: ClientError) {
177+
if (err.payload && typeof err.payload === 'object' && 'response' in err.payload && err.payload['response'] && typeof err.payload['response'] === 'object') {
178+
const res = err.payload['response'] as unknown as ApiError<string>;
179+
if (res.error.code === 'payload_too_big') {
180+
throw new PayloadTooBigError();
181+
}
182+
}
183+
}

packages/orchestrator/lib/server.ts

+7-6
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,17 @@ export const getServer = (scheduler: Scheduler, eventEmmiter: EventEmitter): Exp
4242
createRoute(server, postHeartbeatHandler(scheduler));
4343
createRoute(server, postDequeueHandler(scheduler, eventEmmiter));
4444

45-
server.use((err: unknown, _req: Request, res: Response<ApiError<'server_error', any>>, next: NextFunction) => {
46-
res.status(500).json({ error: { code: 'server_error', errors: err } });
47-
next();
48-
});
49-
50-
server.use((err: any, _req: Request, res: Response<ApiError<'invalid_json' | 'internal_error'>>, _next: any) => {
45+
server.use((err: any, _req: Request, res: Response<ApiError<'invalid_json' | 'internal_error' | 'payload_too_big'>>, _next: NextFunction) => {
5146
if (err instanceof SyntaxError && 'body' in err && 'type' in err && err.type === 'entity.parse.failed') {
5247
res.status(400).send({ error: { code: 'invalid_json', message: err.message } });
5348
return;
49+
} else if (err instanceof Error) {
50+
if (err.message === 'request entity too large') {
51+
res.status(400).json({ error: { code: 'payload_too_big', message: 'Payload is too big' } });
52+
return;
53+
}
5454
}
55+
5556
res.status(500).send({ error: { code: 'internal_error', message: err.message } });
5657
});
5758

packages/shared/lib/sdk/sync.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@ import paginateService from '../services/paginate.service.js';
55
import proxyService from '../services/proxy.service.js';
66
import type { AxiosInstance } from 'axios';
77
import axios, { AxiosError } from 'axios';
8-
import { getPersistAPIUrl, safeStringify } from '../utils/utils.js';
8+
import { getPersistAPIUrl } from '../utils/utils.js';
99
import type { IntegrationWithCreds } from '@nangohq/node';
1010
import type { UserProvidedProxyConfiguration } from '../models/Proxy.js';
1111
import { getLogger, httpRetryStrategy, metrics, retryWithBackoff } from '@nangohq/utils';
1212
import type { SyncConfig } from '../models/Sync.js';
1313
import type { RunnerFlags } from '../services/sync/run.utils.js';
1414
import { validateData } from './dataValidation.js';
1515
import { NangoError } from '../utils/error.js';
16+
import { stringifyAndTruncateLog } from './utils.js';
1617

1718
const logger = getLogger('SDK');
1819

@@ -657,7 +658,8 @@ export class NangoAction {
657658
return;
658659
}
659660

660-
const content = safeStringify(args);
661+
const content = stringifyAndTruncateLog(args, 99_000);
662+
661663
await this.sendLogToPersist(content, { level, timestamp: Date.now() });
662664
}
663665

packages/shared/lib/sdk/utils.ts

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import safeStringify from 'fast-safe-stringify';
2+
3+
export function stringifyAndTruncateLog(args: any[], maxSize: number = 99_000) {
4+
let msg = '';
5+
6+
if (typeof args[0] === 'string') {
7+
msg = args.shift();
8+
}
9+
10+
if (args.length > 0) {
11+
msg += ` ${args.map((arg) => safeStringify.stableStringify(arg, undefined, undefined, { depthLimit: 10, edgesLimit: 20 }))}`;
12+
}
13+
14+
if (msg.length > maxSize) {
15+
msg = `${msg.substring(0, maxSize)}... (truncated)`;
16+
}
17+
18+
return msg;
19+
}
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { describe, expect, it } from 'vitest';
2+
import { stringifyAndTruncateLog } from './utils.js';
3+
4+
describe('stringifyAndTruncateLog', () => {
5+
it('should not break a small string', () => {
6+
const str = stringifyAndTruncateLog(['hello']);
7+
expect(str).toStrictEqual('hello');
8+
});
9+
10+
it('should limit a string', () => {
11+
const str = stringifyAndTruncateLog(['hello'], 2);
12+
expect(str).toStrictEqual('he... (truncated)');
13+
});
14+
15+
it('should limit an object', () => {
16+
const str = stringifyAndTruncateLog(['hello', { foo: 'bar' }], 10);
17+
expect(str).toStrictEqual('hello {"fo... (truncated)');
18+
});
19+
20+
it('should not break empty args', () => {
21+
const str = stringifyAndTruncateLog([]);
22+
expect(str).toStrictEqual('');
23+
});
24+
25+
it('should handle object', () => {
26+
const str = stringifyAndTruncateLog([{ foo: 1 }]);
27+
expect(str).toStrictEqual(' {"foo":1}');
28+
});
29+
it('should handle object + string', () => {
30+
const str = stringifyAndTruncateLog(['hello', { foo: 1 }]);
31+
expect(str).toStrictEqual('hello {"foo":1}');
32+
});
33+
});

packages/shared/lib/utils/error.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ export class NangoError extends Error {
112112
this.message = 'The API endpoint could not be found and returned a 404. Please ensure you have the endpoint specified and spelled correctly.';
113113
break;
114114

115-
case 'fobidden':
115+
case 'forbidden':
116116
this.status = 403;
117117
this.message = 'The API endpoint returned back a 403 error. Check the scopes requested to make sure proper access is requested to the API.';
118118
break;
@@ -603,6 +603,11 @@ export class NangoError extends Error {
603603
this.message = 'Failed to validate a record in batchSave';
604604
break;
605605

606+
case 'action_output_too_big':
607+
this.status = 400;
608+
this.message = 'Action output is too big';
609+
break;
610+
606611
default:
607612
this.status = 500;
608613
this.type = 'unhandled_' + type;

0 commit comments

Comments
 (0)