Skip to content

Commit 0241543

Browse files
authored
feat: export webhook_reply and cron_cursor_job in provisioner (#4597)
* feat: export webhook_reply and cron_cursor_job in provisioner Include webhook_reply and cron_cursor_job_id in the provisioning JSON API response and YAML export. webhook_reply is only emitted when set; cron_cursor_job resolves to the hyphenated job name in YAML. Also fixes webhook_reply atom-to-string conversion in the YAML serialiser to avoid a FunctionClauseError on export. * support webhook_reply during yaml import in the editor * ensure jobs are inserted first * add test for importing cron trigger with cron_cursor_job_id field * refactor tests
1 parent 6f86079 commit 0241543

12 files changed

Lines changed: 362 additions & 23 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ and this project adheres to
1717

1818
### Added
1919

20+
- Support `webhook_reply` and `cron_cursor_job` in the provisioner
21+
[#4587](https://github.com/OpenFn/lightning/issues/4587)
22+
2023
### Changed
2124

2225
### Fixed
@@ -49,7 +52,6 @@ and this project adheres to
4952
and triggers can still be freely deleted from workflows since the snapshot
5053
system preserves their data for every run.
5154
[#4538](https://github.com/OpenFn/lightning/issues/4538)
52-
5355
- Allow users to select which workflow to merge for sandbox merging
5456
[#4002](https://github.com/OpenFn/lightning/issues/4002)
5557
- Bump ws-worker to `v1.23.1`

assets/js/collaborative-editor/adapters/YAMLStateToYDoc.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,18 @@ export class YAMLStateToYDoc {
6666

6767
// Session.Trigger always requires cron_expression
6868
// Default to null for non-cron triggers
69-
const cronExpression =
70-
trigger.type === 'cron' ? trigger.cron_expression : null;
71-
triggerMap.set('cron_expression', cronExpression);
69+
triggerMap.set('cron_expression', null);
70+
triggerMap.set('cron_cursor_job_id', null);
71+
triggerMap.set('webhook_reply', null);
72+
73+
if (trigger.type === 'cron') {
74+
triggerMap.set('cron_expression', trigger.cron_expression ?? null);
75+
triggerMap.set('cron_cursor_job_id', trigger.cron_cursor_job_id ?? null);
76+
}
77+
78+
if (trigger.type === 'webhook') {
79+
triggerMap.set('webhook_reply', trigger.webhook_reply ?? null);
80+
}
7281

7382
return triggerMap;
7483
}

assets/js/yaml/schema/workflow-spec.json

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@
5555
"cron_expression": {
5656
"type": "string"
5757
},
58+
"cron_cursor_job": {
59+
"type": ["string", "null"]
60+
},
61+
"webhook_reply": {
62+
"type": ["string", "null"],
63+
"enum": ["before_start", "after_completion", "custom", null]
64+
},
5865
"pos": {
5966
"type": "object",
6067
"properties": {
@@ -71,7 +78,9 @@
7178
"additionalProperties": false,
7279
"oneOf": [
7380
{
74-
"properties": { "type": { "const": "webhook" } }
81+
"properties": {
82+
"type": { "const": "webhook" }
83+
}
7584
},
7685
{
7786
"properties": {

assets/js/yaml/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ export type StateCronTrigger = {
2121
type: 'cron';
2222
enabled: boolean;
2323
cron_expression: string;
24+
cron_cursor_job_id: string | null;
2425
};
2526

2627
export type StateWebhookTrigger = {
2728
id: string;
2829
enabled: boolean;
2930
type: 'webhook';
31+
webhook_reply: 'before_start' | 'after_completion' | 'custom' | null;
3032
};
3133

3234
export type StateKafkaTrigger = {
@@ -81,13 +83,15 @@ export type SpecCronTrigger = {
8183
type: 'cron';
8284
enabled: boolean;
8385
cron_expression: string;
86+
cron_cursor_job: string | null;
8487
pos: Position | undefined;
8588
};
8689

8790
export type SpecWebhookTrigger = {
8891
id?: string;
8992
type: 'webhook';
9093
enabled: boolean;
94+
webhook_reply: string | null;
9195
pos: Position | undefined;
9296
};
9397

assets/js/yaml/util.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,29 @@ export const convertWorkflowStateToSpec = (
6161
const triggers: { [key: string]: SpecTrigger } = {};
6262
workflowState.triggers.forEach(trigger => {
6363
const pos = workflowState.positions?.[trigger.id];
64+
6465
const triggerDetails: SpecTrigger = {
6566
...(includeIds && { id: trigger.id }),
6667
type: trigger.type,
6768
enabled: trigger.enabled,
6869
pos: trigger.type !== 'kafka' && pos ? roundPosition(pos) : undefined,
69-
cron_expression:
70-
trigger.type === 'cron' && 'cron_expression' in trigger
71-
? trigger.cron_expression
72-
: undefined,
7370
} as SpecTrigger;
7471

72+
if (trigger.type === 'cron') {
73+
const cursorJob = trigger.cron_cursor_job_id
74+
? workflowState.jobs.find(job => job.id === trigger.cron_cursor_job_id)
75+
: null;
76+
77+
triggerDetails.cron_expression = trigger.cron_expression ?? null;
78+
triggerDetails.cron_cursor_job = cursorJob
79+
? hyphenate(cursorJob.name)
80+
: null;
81+
}
82+
83+
if (trigger.type === 'webhook') {
84+
triggerDetails.webhook_reply = trigger.webhook_reply ?? null;
85+
}
86+
7587
// TODO: handle kafka config
7688
triggers[trigger.type] = triggerDetails;
7789
});
@@ -158,17 +170,22 @@ export const convertWorkflowSpecToState = (
158170

159171
let trigger: StateTrigger;
160172
if (specTrigger.type === 'cron') {
173+
const cursorJob = specTrigger.cron_cursor_job
174+
? (stateJobs[specTrigger.cron_cursor_job] ?? null)
175+
: null;
161176
trigger = {
162177
id: uId,
163178
type: 'cron',
164179
enabled,
165180
cron_expression: specTrigger.cron_expression,
181+
cron_cursor_job_id: cursorJob ? cursorJob.id : null,
166182
};
167183
} else if (specTrigger.type === 'webhook') {
168184
trigger = {
169185
id: uId,
170186
type: 'webhook',
171187
enabled,
188+
webhook_reply: specTrigger.webhook_reply,
172189
};
173190
} else {
174191
trigger = {

lib/lightning/export_utils.ex

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,14 @@ defmodule Lightning.ExportUtils do
2828
credential: [:name, :owner],
2929
workflow: [:name, :jobs, :triggers, :edges],
3030
job: [:name, :adaptor, :credential, :globals, :body],
31-
trigger: [:type, :cron_expression, :enabled, :kafka_configuration],
31+
trigger: [
32+
:type,
33+
:webhook_reply,
34+
:cron_expression,
35+
:cron_cursor_job,
36+
:enabled,
37+
:kafka_configuration
38+
],
3239
edge: [
3340
:source_trigger,
3441
:source_job,
@@ -58,7 +65,7 @@ defmodule Lightning.ExportUtils do
5865

5966
%{
6067
# The identifier here for our YAML reducer will be the hyphenated name
61-
id: hyphenate(job.name),
68+
id: job_key(job),
6269
name: job.name,
6370
node_type: :job,
6471
adaptor: job.adaptor,
@@ -68,7 +75,7 @@ defmodule Lightning.ExportUtils do
6875
}
6976
end
7077

71-
defp trigger_to_treenode(trigger) do
78+
defp trigger_to_treenode(trigger, jobs) do
7279
base = %{
7380
id: trigger.id,
7481
enabled: trigger.enabled,
@@ -79,7 +86,18 @@ defmodule Lightning.ExportUtils do
7986

8087
case trigger.type do
8188
:cron ->
82-
Map.put(base, :cron_expression, trigger.cron_expression)
89+
base
90+
|> Map.put(:cron_expression, trigger.cron_expression)
91+
|> then(fn cron ->
92+
if trigger.cron_cursor_job_id do
93+
cursor_job =
94+
Enum.find(jobs, fn j -> j.id == trigger.cron_cursor_job_id end)
95+
96+
Map.put(cron, :cron_cursor_job, cursor_job && job_key(cursor_job))
97+
else
98+
cron
99+
end
100+
end)
83101

84102
:kafka ->
85103
kafka_config =
@@ -102,8 +120,12 @@ defmodule Lightning.ExportUtils do
102120

103121
Map.put(base, :kafka_configuration, kafka_config)
104122

105-
_ ->
106-
base
123+
:webhook ->
124+
if trigger.webhook_reply do
125+
Map.put(base, :webhook_reply, Atom.to_string(trigger.webhook_reply))
126+
else
127+
base
128+
end
107129
end
108130
end
109131

@@ -113,7 +135,7 @@ defmodule Lightning.ExportUtils do
113135

114136
target_job = Enum.find(jobs, fn j -> j.id == edge.target_job_id end)
115137
trigger_name = to_string(source_trigger.type)
116-
target_job_name = hyphenate(target_job.name)
138+
target_job_name = job_key(target_job)
117139

118140
%{
119141
name: "#{trigger_name}->#{target_job_name}",
@@ -125,8 +147,8 @@ defmodule Lightning.ExportUtils do
125147
defp edge_to_treenode(%{source_trigger_id: nil} = edge, _triggers, jobs) do
126148
target_job = Enum.find(jobs, fn j -> j.id == edge.target_job_id end)
127149
source_job = Enum.find(jobs, fn j -> j.id == edge.source_job_id end)
128-
source_job_name = hyphenate(source_job.name)
129-
target_job_name = hyphenate(target_job.name)
150+
source_job_name = job_key(source_job)
151+
target_job_name = job_key(target_job)
130152

131153
%{
132154
name: "#{source_job_name}->#{target_job_name}",
@@ -138,7 +160,7 @@ defmodule Lightning.ExportUtils do
138160
defp merge_edge_common_fields(json, edge, target_job) do
139161
json
140162
|> Map.merge(%{
141-
target_job: hyphenate(target_job.name),
163+
target_job: job_key(target_job),
142164
condition_type: edge.condition_type |> Atom.to_string(),
143165
enabled: edge.enabled,
144166
node_type: :edge
@@ -348,6 +370,10 @@ defmodule Lightning.ExportUtils do
348370
}
349371
end
350372

373+
defp job_key(job) do
374+
hyphenate(job.name)
375+
end
376+
351377
defp project_credential_key(project_credential) do
352378
hyphenate(
353379
"#{project_credential.credential.user.email} #{project_credential.credential.name}"
@@ -378,7 +404,7 @@ defmodule Lightning.ExportUtils do
378404
triggers =
379405
workflow.triggers
380406
|> Enum.sort_by(& &1.inserted_at, NaiveDateTime)
381-
|> Enum.map(fn t -> trigger_to_treenode(t) end)
407+
|> Enum.map(fn t -> trigger_to_treenode(t, workflow.jobs) end)
382408

383409
edges =
384410
workflow.edges

lib/lightning/workflows/workflow.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ defmodule Lightning.Workflows.Workflow do
4444
field :enable_job_logs, :boolean, default: true
4545
field :positions, :map
4646

47+
# the ordering of edges, triggers and jobs are intentional
48+
# ecto reverses the relations when inserting. so jobs->triggers->edges
49+
# triggers depend on jobs for cron_cursor_job_id
4750
has_many :edges, Edge, on_replace: :delete_if_exists
48-
has_many :jobs, Job, on_replace: :delete
4951
has_many :triggers, Trigger, on_replace: :delete_if_exists
52+
has_many :jobs, Job, on_replace: :delete
5053
has_many :versions, WorkflowVersion, foreign_key: :workflow_id
5154

5255
has_many :work_orders, Lightning.WorkOrder

lib/lightning_web/controllers/api/provisioning_json.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ defmodule LightningWeb.API.ProvisioningJSON do
103103
)
104104

105105
trigger
106-
|> Map.take(~w(id type cron_expression enabled)a)
106+
|> Map.take(
107+
~w(id type cron_expression enabled webhook_reply cron_cursor_job_id)a
108+
)
107109
|> Map.put(:kafka_configuration, kafka_configuration)
108110
|> drop_keys_with_nil_value()
109111
end
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
name: webhook-reply-and-cron-cursor-project
2+
description: null
3+
collections: null
4+
credentials: null
5+
workflows:
6+
cron-cursor-workflow:
7+
name: cron cursor workflow
8+
jobs:
9+
cursor-job:
10+
name: cursor job
11+
adaptor: '@openfn/language-common@latest'
12+
credential: null
13+
body: |
14+
fn(state => state)
15+
triggers:
16+
cron:
17+
type: cron
18+
cron_expression: '0 6 * * *'
19+
cron_cursor_job: cursor-job
20+
enabled: true
21+
edges:
22+
cron->cursor-job:
23+
source_trigger: cron
24+
target_job: cursor-job
25+
condition_type: always
26+
enabled: true
27+
webhook-reply-workflow:
28+
name: webhook reply workflow
29+
jobs:
30+
reply-job:
31+
name: reply job
32+
adaptor: '@openfn/language-common@latest'
33+
credential: null
34+
body: |
35+
fn(state => state)
36+
triggers:
37+
webhook:
38+
type: webhook
39+
webhook_reply: after_completion
40+
enabled: true
41+
edges:
42+
webhook->reply-job:
43+
source_trigger: webhook
44+
target_job: reply-job
45+
condition_type: always
46+
enabled: true

0 commit comments

Comments
 (0)