Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
dab1aca
feat: port webhooks-oss module from orkes-conductor
nthmost-orkes May 19, 2026
19dece8
feat(webhooks-oss): in-memory DAO + REST endpoint for first end-to-en…
nthmost-orkes May 19, 2026
dc423aa
feat(webhooks-oss): WebhookConfigService + WebhookConfigResource for …
nthmost-orkes May 19, 2026
e3942d1
feat(webhooks-oss): WebhookWorker + real matchers for end-to-end disp…
nthmost-orkes May 19, 2026
bddc9e6
test(webhooks-oss): cover OSS-added pieces (62 tests, 0 failures)
nthmost-orkes May 19, 2026
5d142b3
test(webhooks-oss): WebhookTest for the put() delegation
nthmost-orkes May 19, 2026
2f2b2a0
test(webhooks-oss): end-to-end integration test wiring real beans
nthmost-orkes May 19, 2026
a154b21
fix(webhooks-oss): four real bugs found by smoke test against running…
nthmost-orkes May 19, 2026
4c18fc5
fix(webhooks-oss): round 1 ruthless cleanup — license headers, contra…
nthmost-orkes May 19, 2026
0e79c47
fix(webhooks-oss): round 2 ruthless cleanup — should-fix items from a…
nthmost-orkes May 19, 2026
9e15a52
chore: revert accidentally-committed unrelated files from prior sessi…
nthmost-orkes May 19, 2026
305f74b
fix(webhooks-oss): correctness — DLQ-friendly worker + matcher recomp…
nthmost-orkes May 19, 2026
4d22dff
test(webhooks-oss): rewrite WebhookWorkerTest with real beans + cover…
nthmost-orkes May 19, 2026
81c9e37
docs(webhooks-oss): WAIT_FOR_WEBHOOK task + REST surface
nthmost-orkes May 19, 2026
02c2b7b
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes May 20, 2026
307981d
docs(webhooks-oss): restore Javadoc on WEBHOOK_QUEUE constant for Ork…
nthmost-orkes May 21, 2026
314a3d7
fix(webhooks-oss): log verification failures at ERROR to match Orkes
nthmost-orkes May 21, 2026
84ea718
test(webhooks-oss): property test for registration/inbound hash agree…
nthmost-orkes May 21, 2026
38c40c5
fix(webhooks-oss): guard StripeVerifier against null api_version
nthmost-orkes May 22, 2026
591f1c0
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes May 25, 2026
96cf64d
feat(postgres-persistence): PostgresWebhookDAO + PostgresWebhookTaskS…
nthmost-orkes May 19, 2026
2460b6f
fix(postgres-persistence): tolerate orphan migrations in webhook DAO …
nthmost-orkes May 19, 2026
f327d89
feat(persistence): WebhookDAO + WebhookTaskService impls for SQLite a…
nthmost-orkes May 19, 2026
778807a
feat(redis-persistence): WebhookDAO + WebhookTaskService impls for Redis
nthmost-orkes May 19, 2026
bb8279c
feat(cassandra-persistence): WebhookDAO + WebhookTaskService impls fo…
nthmost-orkes May 19, 2026
21773c4
feat(postgres-persistence): PostgresWebhookCleanupJob — scheduled ret…
nthmost-orkes May 19, 2026
93bf2db
feat(persistence): MySQL + SQLite WebhookCleanupJob siblings
nthmost-orkes May 19, 2026
8da5edd
feat(persistence): Cassandra TTL + Redis cleanup job — completes rete…
nthmost-orkes May 19, 2026
4d11ba8
refactor(webhooks): extract WebhookMatcherComputer to core
nthmost-orkes May 26, 2026
8cda881
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes May 28, 2026
afc2971
fix(webhook/verifier): stop leaking Stripe signing secret in error re…
nthmost-orkes May 27, 2026
8d1f4b1
fix(webhook/verifier): guard SignatureBasedVerifier against short header
nthmost-orkes May 27, 2026
227b11f
fix(webhook/verifier): use constant-time signature comparison
nthmost-orkes May 27, 2026
e6d9acd
fix(webhook): drop signature/payload material from debug logs
nthmost-orkes May 27, 2026
6eaf567
fix(webhook): null-check verifier lookup before invocation
nthmost-orkes May 27, 2026
372a95e
fix(webhook): drop @SneakyThrows from handlePing
nthmost-orkes May 27, 2026
4096857
fix(webhook): validate matches map key types before downstream use
nthmost-orkes May 27, 2026
ed8ac1e
docs(webhook): document MetadataDAO fetch cost on the matcher hot path
nthmost-orkes May 27, 2026
8d3c3b0
docs(webhook/migrations): clarify webhook_hash_to_taskid column intent
nthmost-orkes May 27, 2026
3a02f42
docs(webhook): document auth model for /api/metadata/webhook endpoints
nthmost-orkes May 27, 2026
17ca929
fix(webhook/verifier): implement Slack's v0= signing-secret protocol
nthmost-orkes May 27, 2026
345f3fe
feat(webhook/cleanup): cluster-safe cleanup lease across cleanup jobs
nthmost-orkes May 27, 2026
303f9b4
feat(webhook): signature-dedup replay protection across verifiers + DAOs
nthmost-orkes May 27, 2026
c3dba50
fix(cassandra/webhook): qualify prepared statements with keyspace
nthmost-orkes May 28, 2026
d98ceed
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes May 28, 2026
71d0a5b
fix(persistence): spring-web on cassandra+redis test classpath
nthmost-orkes May 28, 2026
6efedcc
style: apply spotless formatting to webhook DAO and matcher computer
nthmost-orkes May 28, 2026
893dda9
style: apply spotless formatting across webhook modules
nthmost-orkes May 29, 2026
714c63f
fix(persistence/test): commit inserts in MySQL+Postgres cleanup tests
nthmost-orkes May 29, 2026
57fe865
fix(postgres/test): include notify migration location in webhook tests
nthmost-orkes May 29, 2026
2132c63
fix(postgres/test): truncate tables instead of flyway.clean()
nthmost-orkes May 29, 2026
7780aed
fix(sqlite/test): reset cleanup lease row in @Before
nthmost-orkes May 29, 2026
924fcdd
test(sendgrid): add SendGridVerifierTest — missing headers, bad key, …
nthmost-orkes May 29, 2026
0c6916e
test(incoming-webhook): add IncomingWebhookServiceTest — all handleWe…
nthmost-orkes May 29, 2026
c3e34b8
test(webhook-worker): add non-Map body and recordHistory trim-when-fu…
nthmost-orkes May 29, 2026
c353607
test(webhook-hashing): add WebhookHashingServiceTest — non-Map body, …
nthmost-orkes May 29, 2026
892c61b
refactor(test): replace mocks with real in-memory impls — IncomingWeb…
nthmost-orkes May 29, 2026
db03daa
fix(sqlite/test): reset cleanup lease with parameterized Timestamp
nthmost-orkes May 29, 2026
c100937
fix(sqlite/webhook): seed cleanup lease expires_at as INTEGER
nthmost-orkes May 29, 2026
e7168c5
style: apply spotless to webhooks-oss test files
nthmost-orkes May 29, 2026
5987cf5
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes May 29, 2026
893a996
remove Tag.java from common — tags are enterprise RBAC, no OSS backing
nthmost-orkes Jun 1, 2026
b986d65
drop tags field from WebhookConfig — never populated in OSS
nthmost-orkes Jun 1, 2026
762e4c2
remove EventMessage.java — ported but never wired, no OSS backing
nthmost-orkes Jun 1, 2026
92e122d
remove WebhookExecutionHistory — enterprise-only, both sides marked T…
nthmost-orkes Jun 1, 2026
237701e
drop WebhookConfig.getWorkflowNames() — zero callers, duplicates map …
nthmost-orkes Jun 1, 2026
a05447c
map NonTransientException → 400 in ApplicationExceptionMapper
nthmost-orkes Jun 2, 2026
bc2052d
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 2, 2026
b90d7e2
Merge remote-tracking branch 'origin/main' into feat/webhooks-from-or…
nthmost-orkes Jun 5, 2026
0e3c51a
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 8, 2026
a0e9553
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 12, 2026
5cdd389
fix(webhook): SREM correlation set on task cancel to stop orphan-set …
nthmost-orkes Jun 15, 2026
6ac5431
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 16, 2026
2308893
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 16, 2026
ca44771
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 16, 2026
5f96280
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 16, 2026
e5d9b8d
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 17, 2026
e88ab5b
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 18, 2026
233e20f
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 19, 2026
e5201a6
Merge branch 'main' into feat/webhooks-from-orkes-split
nthmost-orkes Jun 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cassandra-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation "org.apache.commons:commons-lang3"

testImplementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8"
testImplementation 'org.springframework:spring-web'
testImplementation project(':conductor-core').sourceSets.test.output
testImplementation project(':conductor-common').sourceSets.test.output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ public CassandraPollDataDAO cassandraPollDataDAO() {
return new CassandraPollDataDAO();
}

@Bean(name = "webhookDAO")
public org.conductoross.conductor.cassandra.dao.CassandraWebhookDAO cassandraWebhookDAO(
Session session,
ObjectMapper objectMapper,
CassandraProperties properties,
com.netflix.conductor.dao.MetadataDAO metadataDAO,
org.springframework.core.env.Environment env) {
long ttlSeconds = 7L * 24 * 3600;
String retention = env.getProperty("conductor.webhooks.cleanup.retention-duration");
if (retention != null) {
ttlSeconds = java.time.Duration.parse(retention).toSeconds();
}
return new org.conductoross.conductor.cassandra.dao.CassandraWebhookDAO(
session, objectMapper, properties, metadataDAO, ttlSeconds);
}

@Bean(name = "webhookTaskService")
public org.conductoross.conductor.cassandra.dao.CassandraWebhookTaskService
cassandraWebhookTaskService(
Session session, ObjectMapper objectMapper, CassandraProperties properties) {
return new org.conductoross.conductor.cassandra.dao.CassandraWebhookTaskService(
session, objectMapper, properties);
}

@Bean
public Statements statements(CassandraProperties cassandraProperties) {
return new Statements(cassandraProperties.getKeyspace());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Copyright 2026 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.conductoross.conductor.cassandra.dao;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.conductoross.conductor.dao.webhook.WebhookDAO;
import org.conductoross.conductor.service.webhook.WebhookMatcherComputer;
import org.conductoross.conductor.webhook.model.IncomingWebhookEvent;
import org.conductoross.conductor.webhook.model.WebhookConfig;
import org.springframework.lang.Nullable;

import com.netflix.conductor.cassandra.config.CassandraProperties;
import com.netflix.conductor.cassandra.dao.CassandraBaseDAO;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.dao.MetadataDAO;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;

/**
* Cassandra-backed {@link WebhookDAO}.
*
* <p>Tables (created on construction via {@link #ensureTables()}):
*
* <ul>
* <li>{@code webhook (bucket, webhook_id PK, json_data)} — single 'ALL' bucket so
* getAllWebhooks() is a single-partition scan. Anti-pattern for high cardinality, fine for
* admin-scale webhook configs.
* <li>{@code incoming_webhook_event (event_id PK, json_data)} — high cardinality, no listing.
* <li>{@code webhook_target_workflows (webhook_id PK, json_data)} — target workflow versions
* snapshot per webhook. Matchers themselves are recomputed from MetadataDAO on read.
* </ul>
*/
@Slf4j
public class CassandraWebhookDAO extends CassandraBaseDAO implements WebhookDAO {

private static final String TABLE_WEBHOOK = "webhook";
private static final String TABLE_EVENT = "incoming_webhook_event";
private static final String TABLE_TARGETS = "webhook_target_workflows";
private static final String ALL_BUCKET = "ALL";

private static final long DEFAULT_EVENT_TTL_SECONDS = 7L * 24 * 3600; // 7 days

private final Session session;
private final MetadataDAO metadataDAO;
// objectMapper / toJson / readValue are package-private in CassandraBaseDAO; keep a local ref.
private final ObjectMapper objectMapper;
private final long incomingEventTtlSeconds;

private final PreparedStatement insertWebhookStmt;
private final PreparedStatement selectWebhookStmt;
private final PreparedStatement selectAllWebhooksStmt;
private final PreparedStatement deleteWebhookStmt;
private final PreparedStatement insertEventStmt;
private final PreparedStatement selectEventStmt;
private final PreparedStatement deleteEventStmt;
private final PreparedStatement insertTargetsStmt;
private final PreparedStatement selectTargetsStmt;
private final PreparedStatement deleteTargetsStmt;

public CassandraWebhookDAO(
Session session,
ObjectMapper objectMapper,
CassandraProperties properties,
MetadataDAO metadataDAO) {
this(session, objectMapper, properties, metadataDAO, DEFAULT_EVENT_TTL_SECONDS);
}

public CassandraWebhookDAO(
Session session,
ObjectMapper objectMapper,
CassandraProperties properties,
MetadataDAO metadataDAO,
long incomingEventTtlSeconds) {
super(session, objectMapper, properties);
this.session = session;
this.metadataDAO = metadataDAO;
this.objectMapper = objectMapper;
this.incomingEventTtlSeconds = incomingEventTtlSeconds;
ensureTables();

ConsistencyLevel readConsistency = properties.getReadConsistencyLevel();
ConsistencyLevel writeConsistency = properties.getWriteConsistencyLevel();
String webhookTable = properties.getKeyspace() + "." + TABLE_WEBHOOK;
String eventTable = properties.getKeyspace() + "." + TABLE_EVENT;
String targetsTable = properties.getKeyspace() + "." + TABLE_TARGETS;

insertWebhookStmt =
session.prepare(
"INSERT INTO "
+ webhookTable
+ " (bucket, webhook_id, json_data) VALUES (?, ?, ?)")
.setConsistencyLevel(writeConsistency);
selectWebhookStmt =
session.prepare(
"SELECT json_data FROM "
+ webhookTable
+ " WHERE bucket = ? AND webhook_id = ?")
.setConsistencyLevel(readConsistency);
selectAllWebhooksStmt =
session.prepare("SELECT json_data FROM " + webhookTable + " WHERE bucket = ?")
.setConsistencyLevel(readConsistency);
deleteWebhookStmt =
session.prepare(
"DELETE FROM "
+ webhookTable
+ " WHERE bucket = ? AND webhook_id = ?")
.setConsistencyLevel(writeConsistency);

insertEventStmt =
session.prepare(
"INSERT INTO "
+ eventTable
+ " (event_id, json_data) VALUES (?, ?)")
.setConsistencyLevel(writeConsistency);
selectEventStmt =
session.prepare("SELECT json_data FROM " + eventTable + " WHERE event_id = ?")
.setConsistencyLevel(readConsistency);
deleteEventStmt =
session.prepare("DELETE FROM " + eventTable + " WHERE event_id = ?")
.setConsistencyLevel(writeConsistency);

insertTargetsStmt =
session.prepare(
"INSERT INTO "
+ targetsTable
+ " (webhook_id, json_data) VALUES (?, ?)")
.setConsistencyLevel(writeConsistency);
selectTargetsStmt =
session.prepare("SELECT json_data FROM " + targetsTable + " WHERE webhook_id = ?")
.setConsistencyLevel(readConsistency);
deleteTargetsStmt =
session.prepare("DELETE FROM " + targetsTable + " WHERE webhook_id = ?")
.setConsistencyLevel(writeConsistency);
}

private void ensureTables() {
String ks = properties.getKeyspace();
session.execute(
"CREATE TABLE IF NOT EXISTS "
+ ks
+ "."
+ TABLE_WEBHOOK
+ " (bucket text, webhook_id text, json_data text,"
+ " PRIMARY KEY ((bucket), webhook_id))");
// default_time_to_live makes Cassandra auto-expire rows after the configured
// window — matches the cleanup-job behavior on the SQL backings. Note: CREATE
// TABLE IF NOT EXISTS won't update an existing table's TTL; operators changing
// the retention duration on an existing deployment need to ALTER TABLE manually.
session.execute(
"CREATE TABLE IF NOT EXISTS "
+ ks
+ "."
+ TABLE_EVENT
+ " (event_id text PRIMARY KEY, json_data text)"
+ " WITH default_time_to_live = "
+ incomingEventTtlSeconds);
session.execute(
"CREATE TABLE IF NOT EXISTS "
+ ks
+ "."
+ TABLE_TARGETS
+ " (webhook_id text PRIMARY KEY, json_data text)");
}

@Override
public void createWebhook(String id, WebhookConfig webhookConfig) {
session.execute(insertWebhookStmt.bind(ALL_BUCKET, id, toJson(webhookConfig)));
}

@Override
public WebhookConfig getWebhook(String webhookId) {
ResultSet rs = session.execute(selectWebhookStmt.bind(ALL_BUCKET, webhookId));
Row row = rs.one();
return row == null ? null : readValue(row.getString("json_data"), WebhookConfig.class);
}

@Override
public List<WebhookConfig> getAllWebhooks() {
ResultSet rs = session.execute(selectAllWebhooksStmt.bind(ALL_BUCKET));
List<WebhookConfig> out = new ArrayList<>();
for (Row row : rs) {
out.add(readValue(row.getString("json_data"), WebhookConfig.class));
}
return out;
}

@Override
public void removeWebhook(String id) {
if (getWebhook(id) == null) {
throw new NotFoundException("Webhook with id " + id + " not found");
}
session.execute(deleteWebhookStmt.bind(ALL_BUCKET, id));
}

@Override
public void createIncomingWebhookEvent(String id, IncomingWebhookEvent event) {
session.execute(insertEventStmt.bind(id, toJson(event)));
}

@Override
public IncomingWebhookEvent getWebhookEvent(String messageId) {
ResultSet rs = session.execute(selectEventStmt.bind(messageId));
Row row = rs.one();
return row == null
? null
: readValue(row.getString("json_data"), IncomingWebhookEvent.class);
}

@Override
public void removeWebhookEvent(String id) {
session.execute(deleteEventStmt.bind(id));
}

@Override
public Map<String, Map<String, Object>> getMatchers(String webhookId) {
return WebhookMatcherComputer.compute(loadTargets(webhookId), metadataDAO);
}

@Override
public void createMatchers(
WebhookConfig webhookConfig,
@Nullable Map<String, Integer> receiverWorkflowNamesToVersionsOverride) {
Map<String, Integer> targets =
receiverWorkflowNamesToVersionsOverride == null
? Collections.emptyMap()
: receiverWorkflowNamesToVersionsOverride;
session.execute(insertTargetsStmt.bind(webhookConfig.getId(), toJson(targets)));
}

@Override
public void removeMatchers(String id) {
session.execute(deleteTargetsStmt.bind(id));
}

@SuppressWarnings("unchecked")
private Map<String, Integer> loadTargets(String webhookId) {
ResultSet rs = session.execute(selectTargetsStmt.bind(webhookId));
Row row = rs.one();
if (row == null) {
return Collections.emptyMap();
}
return readValue(row.getString("json_data"), Map.class);
}

private String toJson(Object value) {
try {
return objectMapper.writeValueAsString(value);
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private <T> T readValue(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (java.io.IOException e) {
throw new RuntimeException(e);
}
}
}
Loading
Loading