Skip to content

Commit 124f67c

Browse files
authored
chore: add sync type and validation for orchestrator TaskSync (NangoHQ#2256)
Just the types and validation. No sync are yet scheduled or processed ## Checklist before requesting a review (skip if just adding/editing APIs & templates) - [ ] I added tests, otherwise the reason is: - [ ] I added observability, otherwise the reason is: - [ ] I added analytics, otherwise the reason is:
1 parent 1f706e6 commit 124f67c

File tree

6 files changed

+117
-58
lines changed

6 files changed

+117
-58
lines changed

packages/jobs/lib/processor/handler.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { OrchestratorTask, TaskWebhook, TaskAction, TaskPostConnection } from '@nangohq/nango-orchestrator';
1+
import type { OrchestratorTask, TaskWebhook, TaskAction, TaskPostConnection, TaskSync } from '@nangohq/nango-orchestrator';
22
import { jsonSchema } from '@nangohq/nango-orchestrator';
33
import type { JsonValue } from 'type-fest';
44
import { Err, Ok } from '@nangohq/utils';
@@ -13,6 +13,9 @@ export async function handler(task: OrchestratorTask): Promise<Result<JsonValue>
1313
task.abortController.signal.onabort = () => {
1414
abort(task);
1515
};
16+
if (task.isSync()) {
17+
return sync(task);
18+
}
1619
if (task.isAction()) {
1720
return action(task);
1821
}
@@ -30,6 +33,10 @@ async function abort(_task: OrchestratorTask): Promise<Result<void>> {
3033
return Ok(undefined);
3134
}
3235

36+
async function sync(task: TaskSync): Promise<Result<JsonValue>> {
37+
return Err(`Not implemented: ${JSON.stringify({ taskId: task.id })}`);
38+
}
39+
3340
async function action(task: TaskAction): Promise<Result<JsonValue>> {
3441
const providerConfig = await configService.getProviderConfig(task.connection.provider_config_key, task.connection.environment_id);
3542
if (providerConfig === null) {

packages/orchestrator/lib/clients/client.ts

+4-21
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ import type {
1515
ExecuteReturn,
1616
ExecuteWebhookProps,
1717
ExecutePostConnectionProps,
18-
TaskAction,
19-
TaskWebhook,
20-
TaskPostConnection,
2118
OrchestratorTask
2219
} from './types.js';
2320
import { validateTask } from './validate.js';
@@ -155,15 +152,7 @@ export class OrchestratorClient {
155152
return this.execute(schedulingProps);
156153
}
157154

158-
public async search({
159-
ids,
160-
groupKey,
161-
limit
162-
}: {
163-
ids?: string[];
164-
groupKey?: string;
165-
limit?: number;
166-
}): Promise<Result<(TaskWebhook | TaskAction | TaskPostConnection)[], ClientError>> {
155+
public async search({ ids, groupKey, limit }: { ids?: string[]; groupKey?: string; limit?: number }): Promise<Result<OrchestratorTask[], ClientError>> {
167156
const body = {
168157
...(ids ? { ids } : {}),
169158
...(groupKey ? { groupKey } : {}),
@@ -239,13 +228,7 @@ export class OrchestratorClient {
239228
}
240229
}
241230

242-
public async succeed({
243-
taskId,
244-
output
245-
}: {
246-
taskId: string;
247-
output: JsonValue;
248-
}): Promise<Result<TaskAction | TaskWebhook | TaskPostConnection, ClientError>> {
231+
public async succeed({ taskId, output }: { taskId: string; output: JsonValue }): Promise<Result<OrchestratorTask, ClientError>> {
249232
const res = await this.routeFetch(putTaskRoute)({
250233
params: { taskId },
251234
body: { output, state: 'SUCCEEDED' }
@@ -265,7 +248,7 @@ export class OrchestratorClient {
265248
}
266249
}
267250

268-
public async failed({ taskId, error }: { taskId: string; error: Error }): Promise<Result<TaskAction | TaskWebhook | TaskPostConnection, ClientError>> {
251+
public async failed({ taskId, error }: { taskId: string; error: Error }): Promise<Result<OrchestratorTask, ClientError>> {
269252
const output = { name: error.name, message: error.message };
270253
const res = await this.routeFetch(putTaskRoute)({
271254
params: { taskId },
@@ -286,7 +269,7 @@ export class OrchestratorClient {
286269
}
287270
}
288271

289-
public async cancel({ taskId, reason }: { taskId: string; reason: string }): Promise<Result<TaskAction | TaskWebhook | TaskPostConnection, ClientError>> {
272+
public async cancel({ taskId, reason }: { taskId: string; reason: string }): Promise<Result<OrchestratorTask, ClientError>> {
290273
const res = await this.routeFetch(putTaskRoute)({
291274
params: { taskId },
292275
body: { output: reason, state: 'CANCELLED' }

packages/orchestrator/lib/clients/processor.integration.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { EventsHandler } from '../events.js';
88
import { Ok, Err } from '@nangohq/utils';
99
import type { Result } from '@nangohq/utils';
1010
import type { JsonValue } from 'type-fest';
11-
import type { OrchestratorTask, TaskAction, TaskWebhook, TaskPostConnection } from './types.js';
11+
import type { OrchestratorTask } from './types.js';
1212
import { tracer } from 'dd-trace';
1313

1414
const dbClient = getTestDbClient();
@@ -99,7 +99,7 @@ describe('OrchestratorProcessor', async () => {
9999
});
100100
});
101101

102-
async function processN(handler: (task: TaskAction | TaskWebhook | TaskPostConnection) => Promise<Result<JsonValue>>, groupKey: string, n: number) {
102+
async function processN(handler: (task: OrchestratorTask) => Promise<Result<JsonValue>>, groupKey: string, n: number) {
103103
const processor = new OrchestratorProcessor({
104104
handler,
105105
opts: { orchestratorClient, groupKey, maxConcurrency: n, checkForTerminatedInterval: 100 }

packages/orchestrator/lib/clients/types.ts

+36-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,18 @@ import type { TaskState } from '@nangohq/scheduler';
55

66
export type SchedulingProps = Omit<PostSchedule['Body'], 'scheduling'>;
77

8+
interface SyncArgs {
9+
syncId: string;
10+
syncName: string;
11+
debug: boolean;
12+
connection: {
13+
id: number;
14+
provider_config_key: string;
15+
environment_id: number;
16+
connection_id: string;
17+
};
18+
syncJobId: number;
19+
}
820
interface ActionArgs {
921
actionName: string;
1022
connection: {
@@ -46,20 +58,40 @@ export type ExecuteActionProps = Omit<ExecuteProps, 'args'> & { args: ActionArgs
4658
export type ExecuteWebhookProps = Omit<ExecuteProps, 'args'> & { args: WebhookArgs };
4759
export type ExecutePostConnectionProps = Omit<ExecuteProps, 'args'> & { args: PostConnectionArgs };
4860

49-
export type OrchestratorTask = TaskAction | TaskWebhook | TaskPostConnection;
61+
export type OrchestratorTask = TaskSync | TaskAction | TaskWebhook | TaskPostConnection;
5062

5163
interface TaskCommonFields {
5264
id: string;
5365
name: string;
5466
state: TaskState;
5567
}
5668
interface TaskCommon extends TaskCommonFields {
69+
isSync(this: OrchestratorTask): this is TaskSync;
5770
isWebhook(this: OrchestratorTask): this is TaskWebhook;
5871
isAction(this: OrchestratorTask): this is TaskAction;
5972
isPostConnection(this: OrchestratorTask): this is TaskPostConnection;
6073
abortController: AbortController;
6174
}
6275

76+
export interface TaskSync extends TaskCommon, SyncArgs {}
77+
export function TaskSync(props: TaskCommonFields & SyncArgs): TaskSync {
78+
return {
79+
id: props.id,
80+
name: props.name,
81+
syncId: props.syncId,
82+
syncName: props.syncName,
83+
syncJobId: props.syncJobId,
84+
debug: props.debug,
85+
connection: props.connection,
86+
state: props.state,
87+
abortController: new AbortController(),
88+
isSync: () => true,
89+
isWebhook: () => false,
90+
isAction: () => false,
91+
isPostConnection: () => false
92+
};
93+
}
94+
6395
export interface TaskAction extends TaskCommon, ActionArgs {}
6496
export function TaskAction(props: TaskCommonFields & ActionArgs): TaskAction {
6597
return {
@@ -71,6 +103,7 @@ export function TaskAction(props: TaskCommonFields & ActionArgs): TaskAction {
71103
activityLogId: props.activityLogId,
72104
input: props.input,
73105
abortController: new AbortController(),
106+
isSync: () => false,
74107
isWebhook: () => false,
75108
isAction: () => true,
76109
isPostConnection: () => false
@@ -89,6 +122,7 @@ export function TaskWebhook(props: TaskCommonFields & WebhookArgs): TaskWebhook
89122
activityLogId: props.activityLogId,
90123
input: props.input,
91124
abortController: new AbortController(),
125+
isSync: () => false,
92126
isWebhook: () => true,
93127
isAction: () => false,
94128
isPostConnection: () => false
@@ -106,6 +140,7 @@ export function TaskPostConnection(props: TaskCommonFields & PostConnectionArgs)
106140
fileLocation: props.fileLocation,
107141
activityLogId: props.activityLogId,
108142
abortController: new AbortController(),
143+
isSync: () => false,
109144
isWebhook: () => false,
110145
isAction: () => false,
111146
isPostConnection: () => true

packages/orchestrator/lib/clients/validate.ts

+64-3
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,63 @@
11
import { taskStates } from '@nangohq/scheduler';
22
import type { Task } from '@nangohq/scheduler';
3-
import { TaskAction, TaskWebhook, TaskPostConnection } from './types.js';
3+
import type { OrchestratorTask } from './types.js';
4+
import { TaskAction, TaskWebhook, TaskPostConnection, TaskSync } from './types.js';
45
import { z } from 'zod';
5-
import { actionArgsSchema, webhookArgsSchema, postConnectionArgsSchema } from '../routes/v1/postSchedule.js';
66
import { Err, Ok } from '@nangohq/utils';
77
import type { Result } from '@nangohq/utils';
8+
import { jsonSchema } from '../utils/validation.js';
9+
10+
export const commonSchemaArgsFields = {
11+
connection: z.object({
12+
id: z.number().positive(),
13+
connection_id: z.string().min(1),
14+
provider_config_key: z.string().min(1),
15+
environment_id: z.number().positive()
16+
})
17+
};
18+
19+
export const syncArgsSchema = z.object({
20+
type: z.literal('sync'),
21+
syncId: z.string().min(1),
22+
syncName: z.string().min(1),
23+
syncJobId: z.number().int().positive(),
24+
debug: z.boolean(),
25+
...commonSchemaArgsFields
26+
});
27+
28+
export const actionArgsSchema = z.object({
29+
type: z.literal('action'),
30+
actionName: z.string().min(1),
31+
activityLogId: z.number().positive(),
32+
input: jsonSchema,
33+
...commonSchemaArgsFields
34+
});
35+
export const webhookArgsSchema = z.object({
36+
type: z.literal('webhook'),
37+
webhookName: z.string().min(1),
38+
parentSyncName: z.string().min(1),
39+
activityLogId: z.number().positive(),
40+
input: jsonSchema,
41+
...commonSchemaArgsFields
42+
});
43+
export const postConnectionArgsSchema = z.object({
44+
type: z.literal('post-connection-script'),
45+
postConnectionName: z.string().min(1),
46+
fileLocation: z.string().min(1),
47+
activityLogId: z.number().positive(),
48+
...commonSchemaArgsFields
49+
});
850

951
const commonSchemaFields = {
1052
id: z.string().uuid(),
1153
name: z.string().min(1),
1254
groupKey: z.string().min(1),
1355
state: z.enum(taskStates)
1456
};
57+
const syncSchema = z.object({
58+
...commonSchemaFields,
59+
payload: syncArgsSchema
60+
});
1561
const actionSchema = z.object({
1662
...commonSchemaFields,
1763
payload: actionArgsSchema
@@ -25,7 +71,22 @@ const postConnectionSchema = z.object({
2571
payload: postConnectionArgsSchema
2672
});
2773

28-
export function validateTask(task: Task): Result<TaskAction | TaskWebhook | TaskPostConnection> {
74+
export function validateTask(task: Task): Result<OrchestratorTask> {
75+
const sync = syncSchema.safeParse(task);
76+
if (sync.success) {
77+
return Ok(
78+
TaskSync({
79+
id: sync.data.id,
80+
state: sync.data.state,
81+
name: sync.data.name,
82+
syncId: sync.data.payload.syncId,
83+
syncName: sync.data.payload.syncName,
84+
connection: sync.data.payload.connection,
85+
syncJobId: sync.data.payload.syncJobId,
86+
debug: sync.data.payload.debug
87+
})
88+
);
89+
}
2990
const action = actionSchema.safeParse(task);
3091
if (action.success) {
3192
return Ok(

packages/orchestrator/lib/routes/v1/postSchedule.ts

+3-30
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import type { Scheduler } from '@nangohq/scheduler';
44
import type { ApiError, Endpoint } from '@nangohq/types';
55
import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils';
66
import { validateRequest } from '@nangohq/utils';
7-
import { jsonSchema } from '../../utils/validation.js';
87
import type { TaskType } from '../../types.js';
8+
import { syncArgsSchema, actionArgsSchema, postConnectionArgsSchema, webhookArgsSchema } from '../../clients/validate.js';
99

1010
const path = '/v1/schedule';
1111
const method = 'POST';
@@ -32,40 +32,13 @@ export type PostSchedule = Endpoint<{
3232
Success: { taskId: string };
3333
}>;
3434

35-
const commonSchemaFields = {
36-
connection: z.object({
37-
id: z.number().positive(),
38-
connection_id: z.string().min(1),
39-
provider_config_key: z.string().min(1),
40-
environment_id: z.number().positive()
41-
}),
42-
activityLogId: z.number().positive(),
43-
input: jsonSchema
44-
};
45-
46-
export const actionArgsSchema = z.object({
47-
type: z.literal('action'),
48-
actionName: z.string().min(1),
49-
...commonSchemaFields
50-
});
51-
export const webhookArgsSchema = z.object({
52-
type: z.literal('webhook'),
53-
webhookName: z.string().min(1),
54-
parentSyncName: z.string().min(1),
55-
...commonSchemaFields
56-
});
57-
export const postConnectionArgsSchema = z.object({
58-
type: z.literal('post-connection-script'),
59-
postConnectionName: z.string().min(1),
60-
fileLocation: z.string().min(1),
61-
...commonSchemaFields
62-
});
63-
6435
const validate = validateRequest<PostSchedule>({
6536
parseBody: (data: any) => {
6637
function argsSchema(data: any) {
6738
if ('args' in data && 'type' in data.args) {
6839
switch (data.args.type) {
40+
case 'sync':
41+
return syncArgsSchema;
6942
case 'action':
7043
return actionArgsSchema;
7144
case 'webhook':

0 commit comments

Comments
 (0)