Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion api-contracts/v1/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,85 @@ package v1;
import "v1/shared/condition.proto";

service V1Dispatcher {
rpc RegisterDurableEvent(RegisterDurableEventRequest) returns (RegisterDurableEventResponse) {}
rpc DurableTask(stream DurableTaskRequest) returns (stream DurableTaskResponse) {}

// NOTE: deprecated after DurableEventLog is implemented
rpc RegisterDurableEvent(RegisterDurableEventRequest) returns (RegisterDurableEventResponse) {}
rpc ListenForDurableEvent(stream ListenForDurableEventRequest) returns (stream DurableEvent) {}
}

message DurableTaskRequestRegisterWorker {
string worker_id = 1;
}

message DurableTaskResponseRegisterWorker {
string worker_id = 1;
}

enum DurableTaskTriggerKind {
DURABLE_TASK_TRIGGER_KIND_UNSPECIFIED = 0;
DURABLE_TASK_TRIGGER_KIND_RUN = 1;
DURABLE_TASK_TRIGGER_KIND_WAIT_FOR = 2;
DURABLE_TASK_TRIGGER_KIND_MEMO = 3;
}

message DurableTaskRequestTrigger {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the idea here that triggering children out of a durable task will be entirely separate from the normal trigger path?

// The session_id is used to identify the worker session for this durable task, since
// there can be many sessions for the same durable task (e.g. retries)
int64 session_id = 1;
string durable_task_external_id = 2;
DurableTaskTriggerKind kind = 3;
optional bytes payload = 4;
optional DurableEventListenerConditions wait_for_conditions = 5;
}

message DurableTaskResponseTriggerAck {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name is a little confusing 😅 maybe I'm just not following, but I don't really get what it's supposed to represent

int64 session_id = 1;
string durable_task_external_id = 2;
int64 node_id = 3;
}

message DurableTaskRequestRegisterCallback {
int64 session_id = 1;
string durable_task_external_id = 2;
int64 node_id = 3;
}

message DurableTaskResponseRegisterCallbackAck {
int64 session_id = 1;
string durable_task_external_id = 2;
int64 node_id = 3;
}

message DurableTaskResponseCallbackCompleted {
int64 session_id = 1;
string durable_task_external_id = 2;
int64 node_id = 3;
bytes payload = 4;
}

message DurableTaskRequestEvictSession {
int64 session_id = 1;
string durable_task_external_id = 2;
}

message DurableTaskRequest {
oneof message {
DurableTaskRequestRegisterWorker register_worker = 1;
DurableTaskRequestTrigger trigger = 2;
DurableTaskRequestRegisterCallback register_callback = 3;
DurableTaskRequestEvictSession evict_session = 4;
}
}

message DurableTaskResponse {
oneof message {
DurableTaskResponseRegisterWorker register_worker = 1;
DurableTaskResponseTriggerAck trigger_ack = 2;
DurableTaskResponseRegisterCallbackAck register_callback_ack = 3;
DurableTaskResponseCallbackCompleted callback_completed = 4;
}
}

message RegisterDurableEventRequest {
string task_id = 1; // external uuid for the task run
Expand Down
95 changes: 95 additions & 0 deletions sql/schema/v1-core.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2215,3 +2215,98 @@ CREATE TABLE v1_event_to_run (

PRIMARY KEY (event_id, event_seen_at, run_external_id)
) PARTITION BY RANGE(event_seen_at);

-- v1_durable_event_log represents the log file for the durable event history
-- of a durable task. This table stores metadata like sequence values for entries.
--
-- Important: writers to v1_durable_event_log_entry should lock this row to increment the sequence value.
CREATE TABLE v1_durable_event_log_file (
-- The id and inserted_at of the durable task which created this entry
durable_task_id BIGINT NOT NULL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we thinking of having a separate v1_durable_task table given the naming used here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this was just to make it more clear that this is the durable task and doesn't refer to any child tasks which may have been triggered in the event history

durable_task_inserted_at BIGINT NOT NULL,
latest_inserted_at TIMESTAMPTZ NOT NULL,
-- A monotonically increasing node id for this durable event log scoped to the durable task.
-- Starts at 0 and increments by 1 for each new entry.
latest_node_id BIGINT NOT NULL,
-- The latest branch id. Branches represent different execution paths on a replay.
latest_branch_id BIGINT NOT NULL,
-- The parent node id which should be linked to the first node in a new branch to its parent node.
latest_branch_first_parent_node_id BIGINT NOT NULL,
CONSTRAINT v1_durable_event_log_file_pkey PRIMARY KEY (durable_task_id, durable_task_inserted_at)
) PARTITION BY RANGE(durable_task_inserted_at);

CREATE TYPE v1_durable_event_log_entry_kind AS ENUM (
'RUN_TRIGGERED',
'WAIT_FOR_STARTED',
'MEMO_STARTED'
);

CREATE TABLE v1_durable_event_log_entry (
-- The id and inserted_at of the durable task which created this entry
durable_task_id BIGINT NOT NULL,
durable_task_inserted_at BIGINT NOT NULL,
-- The inserted_at time of this event from a DB clock perspective.
-- Important: for consistency, this should always be auto-generated via the CURRENT_TIMESTAMP!
inserted_at TIMESTAMPTZ NOT NULL,
kind v1_durable_event_log_entry_kind,
-- The node number in the durable event log. This represents a monotonically increasing
-- sequence value generated from v1_durable_event_log_file.latest_node_id
node_id BIGINT NOT NULL,
-- The parent node id for this event, if any. This can be null.
parent_node_id BIGINT,
-- The branch id when this event was first seen. A durable event log can be a part of many branches.
branch_id BIGINT NOT NULL,
-- Todo: Associated data for this event should be stored in the v1_payload table!
-- data JSONB,
-- The hash of the data stored in the v1_payload table to check non-determinism violations.
-- This can be null for event types that don't have associated data.
-- TODO: we can add CHECK CONSTRAINT for event types that require data_hash to be non-null.
data_hash BYTEA,
-- Can discuss: adds some flexibility for future hash algorithms
data_hash_alg TEXT,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be an enum instead of TEXT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think this is something we need to discuss - I'm not convinced that this hashing approach is the correct one to detect non-determinism, it's purely an idea right now. I agree that if we decide to take this approach we should use an enum

-- Access patterns:
-- Definite: we'll query directly for the node_id when a durable task is replaying its log
-- Possible: we may want to query a range of node_ids for a durable task
-- Possible: we may want to query a range of inserted_ats for a durable task
CONSTRAINT v1_durable_event_log_entry_pkey PRIMARY KEY (durable_task_id, durable_task_inserted_at, node_id)
) PARTITION BY RANGE(durable_task_inserted_at);

CREATE TYPE v1_durable_event_log_callback_kind AS ENUM (
'RUN_COMPLETED',
-- WAIT_FOR_COMPLETED can represent a durable sleep, an event, or some boolean combination of
-- these.
'WAIT_FOR_COMPLETED',
'MEMO_COMPLETED',
);

-- v1_durable_event_log_callback stores callbacks that complete a durable event log entry. This needs to be stateful
-- so that it persists across worker restarts for the same durable task.
--
-- Implementation notes (should be removed or moved elsewhere once this is done):
-- 1. Why not store callbacks in the core durable event log? Because their entries are not guaranteed to be
-- stable. For example, if a task is replayed, we would need to remove old callback entries and insert new ones,
-- in which case the durable event log would be changing history.
-- 2. The two important access patterns are direct queries from the worker side to check if a callback is satisfied,
-- and direct queries from the engine side to mark a callback as satisfied when we've satisfied a v1_match. Because
-- of this, we likely need to add a `callback_key` field to the v1_match table.
CREATE TABLE v1_durable_event_log_callback (
durable_task_id BIGINT NOT NULL,
durable_task_inserted_at BIGINT NOT NULL,
-- The inserted_at time of this callback from a DB clock perspective.
-- Important: for consistency, this should always be auto-generated via the CURRENT_TIMESTAMP!
inserted_at TIMESTAMPTZ NOT NULL,
kind v1_durable_event_log_callback_kind,
-- A unique, generated key for this callback. This key will change dependent on the callback kind.
-- Important: this key should be easily queryable directly from the durable log writers but also the controllers
-- that are checking if callbacks are satisfied.
key TEXT NOT NULL,
-- The associated log node id that this callback references.
node_id BIGINT NOT NULL,
-- Whether this callback has been seen by the engine or not. Note that is_satisfied _may_ change multiple
-- times through the lifecycle of a callback, and readers should not assume that once it's true it will always be true.
is_satisfied BOOLEAN NOT NULL DEFAULT FALSE,
-- Access patterns:
-- Definite: we'll query directly for the key when a worker is checking if a callback is satisfied
-- Definite: we'll query directly for the key when a v1_match has been satisfied and we need to mark the callback as satisfied
CONSTRAINT v1_durable_event_log_callback_pkey PRIMARY KEY (durable_task_id, durable_task_inserted_at, key)
) PARTITION BY RANGE(durable_task_inserted_at);
Loading