diff --git a/api-contracts/v1/dispatcher.proto b/api-contracts/v1/dispatcher.proto index 6c92f90adc..e8a4347083 100644 --- a/api-contracts/v1/dispatcher.proto +++ b/api-contracts/v1/dispatcher.proto @@ -7,13 +7,85 @@ package v1; import "v1/shared/condition.proto"; service V1Dispatcher { - rpc RegisterDurableEvent(RegisterDurableEventRequest) returns (RegisterDurableEventResponse) {} + rpc DurableTask(stream DurableTaskRequest) returns (stream DurableTaskResponse) {} + // NOTE: deprecated after DurableEventLog is implemented + rpc RegisterDurableEvent(RegisterDurableEventRequest) returns (RegisterDurableEventResponse) {} rpc ListenForDurableEvent(stream ListenForDurableEventRequest) returns (stream DurableEvent) {} +} +message DurableTaskRequestRegisterWorker { + string worker_id = 1; } +message DurableTaskResponseRegisterWorker { + string worker_id = 1; +} + +enum DurableTaskTriggerKind { + DURABLE_TASK_TRIGGER_KIND_UNSPECIFIED = 0; + DURABLE_TASK_TRIGGER_KIND_RUN = 1; + DURABLE_TASK_TRIGGER_KIND_WAIT_FOR = 2; + DURABLE_TASK_TRIGGER_KIND_MEMO = 3; +} +message DurableTaskRequestTrigger { + // The session_id is used to identify the worker session for this durable task, since + // there can be many sessions for the same durable task (e.g. retries) + int64 session_id = 1; + string durable_task_external_id = 2; + DurableTaskTriggerKind kind = 3; + optional bytes payload = 4; + optional DurableEventListenerConditions wait_for_conditions = 5; +} + +message DurableTaskResponseTriggerAck { + int64 session_id = 1; + string durable_task_external_id = 2; + int64 node_id = 3; +} + +message DurableTaskRequestRegisterCallback { + int64 session_id = 1; + string durable_task_external_id = 2; + int64 node_id = 3; +} + +message DurableTaskResponseRegisterCallbackAck { + int64 session_id = 1; + string durable_task_external_id = 2; + int64 node_id = 3; +} + +message DurableTaskResponseCallbackCompleted { + int64 session_id = 1; + string durable_task_external_id = 2; + int64 node_id = 3; + bytes payload = 4; +} + +message DurableTaskRequestEvictSession { + int64 session_id = 1; + string durable_task_external_id = 2; +} + +message DurableTaskRequest { + oneof message { + DurableTaskRequestRegisterWorker register_worker = 1; + DurableTaskRequestTrigger trigger = 2; + DurableTaskRequestRegisterCallback register_callback = 3; + DurableTaskRequestEvictSession evict_session = 4; + } +} + +message DurableTaskResponse { + oneof message { + DurableTaskResponseRegisterWorker register_worker = 1; + DurableTaskResponseTriggerAck trigger_ack = 2; + DurableTaskResponseRegisterCallbackAck register_callback_ack = 3; + DurableTaskResponseCallbackCompleted callback_completed = 4; + } +} message RegisterDurableEventRequest { string task_id = 1; // external uuid for the task run diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index a8e0f1624e..d6edf064d7 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -2215,3 +2215,98 @@ CREATE TABLE v1_event_to_run ( PRIMARY KEY (event_id, event_seen_at, run_external_id) ) PARTITION BY RANGE(event_seen_at); + +-- v1_durable_event_log represents the log file for the durable event history +-- of a durable task. This table stores metadata like sequence values for entries. +-- +-- Important: writers to v1_durable_event_log_entry should lock this row to increment the sequence value. +CREATE TABLE v1_durable_event_log_file ( + -- The id and inserted_at of the durable task which created this entry + durable_task_id BIGINT NOT NULL, + durable_task_inserted_at BIGINT NOT NULL, + latest_inserted_at TIMESTAMPTZ NOT NULL, + -- A monotonically increasing node id for this durable event log scoped to the durable task. + -- Starts at 0 and increments by 1 for each new entry. + latest_node_id BIGINT NOT NULL, + -- The latest branch id. Branches represent different execution paths on a replay. + latest_branch_id BIGINT NOT NULL, + -- The parent node id which should be linked to the first node in a new branch to its parent node. + latest_branch_first_parent_node_id BIGINT NOT NULL, + CONSTRAINT v1_durable_event_log_file_pkey PRIMARY KEY (durable_task_id, durable_task_inserted_at) +) PARTITION BY RANGE(durable_task_inserted_at); + +CREATE TYPE v1_durable_event_log_entry_kind AS ENUM ( + 'RUN_TRIGGERED', + 'WAIT_FOR_STARTED', + 'MEMO_STARTED' +); + +CREATE TABLE v1_durable_event_log_entry ( + -- The id and inserted_at of the durable task which created this entry + durable_task_id BIGINT NOT NULL, + durable_task_inserted_at BIGINT NOT NULL, + -- The inserted_at time of this event from a DB clock perspective. + -- Important: for consistency, this should always be auto-generated via the CURRENT_TIMESTAMP! + inserted_at TIMESTAMPTZ NOT NULL, + kind v1_durable_event_log_entry_kind, + -- The node number in the durable event log. This represents a monotonically increasing + -- sequence value generated from v1_durable_event_log_file.latest_node_id + node_id BIGINT NOT NULL, + -- The parent node id for this event, if any. This can be null. + parent_node_id BIGINT, + -- The branch id when this event was first seen. A durable event log can be a part of many branches. + branch_id BIGINT NOT NULL, + -- Todo: Associated data for this event should be stored in the v1_payload table! + -- data JSONB, + -- The hash of the data stored in the v1_payload table to check non-determinism violations. + -- This can be null for event types that don't have associated data. + -- TODO: we can add CHECK CONSTRAINT for event types that require data_hash to be non-null. + data_hash BYTEA, + -- Can discuss: adds some flexibility for future hash algorithms + data_hash_alg TEXT, + -- Access patterns: + -- Definite: we'll query directly for the node_id when a durable task is replaying its log + -- Possible: we may want to query a range of node_ids for a durable task + -- Possible: we may want to query a range of inserted_ats for a durable task + CONSTRAINT v1_durable_event_log_entry_pkey PRIMARY KEY (durable_task_id, durable_task_inserted_at, node_id) +) PARTITION BY RANGE(durable_task_inserted_at); + +CREATE TYPE v1_durable_event_log_callback_kind AS ENUM ( + 'RUN_COMPLETED', + -- WAIT_FOR_COMPLETED can represent a durable sleep, an event, or some boolean combination of + -- these. + 'WAIT_FOR_COMPLETED', + 'MEMO_COMPLETED', +); + +-- v1_durable_event_log_callback stores callbacks that complete a durable event log entry. This needs to be stateful +-- so that it persists across worker restarts for the same durable task. +-- +-- Implementation notes (should be removed or moved elsewhere once this is done): +-- 1. Why not store callbacks in the core durable event log? Because their entries are not guaranteed to be +-- stable. For example, if a task is replayed, we would need to remove old callback entries and insert new ones, +-- in which case the durable event log would be changing history. +-- 2. The two important access patterns are direct queries from the worker side to check if a callback is satisfied, +-- and direct queries from the engine side to mark a callback as satisfied when we've satisfied a v1_match. Because +-- of this, we likely need to add a `callback_key` field to the v1_match table. +CREATE TABLE v1_durable_event_log_callback ( + durable_task_id BIGINT NOT NULL, + durable_task_inserted_at BIGINT NOT NULL, + -- The inserted_at time of this callback from a DB clock perspective. + -- Important: for consistency, this should always be auto-generated via the CURRENT_TIMESTAMP! + inserted_at TIMESTAMPTZ NOT NULL, + kind v1_durable_event_log_callback_kind, + -- A unique, generated key for this callback. This key will change dependent on the callback kind. + -- Important: this key should be easily queryable directly from the durable log writers but also the controllers + -- that are checking if callbacks are satisfied. + key TEXT NOT NULL, + -- The associated log node id that this callback references. + node_id BIGINT NOT NULL, + -- Whether this callback has been seen by the engine or not. Note that is_satisfied _may_ change multiple + -- times through the lifecycle of a callback, and readers should not assume that once it's true it will always be true. + is_satisfied BOOLEAN NOT NULL DEFAULT FALSE, + -- Access patterns: + -- Definite: we'll query directly for the key when a worker is checking if a callback is satisfied + -- Definite: we'll query directly for the key when a v1_match has been satisfied and we need to mark the callback as satisfied + CONSTRAINT v1_durable_event_log_callback_pkey PRIMARY KEY (durable_task_id, durable_task_inserted_at, key) +) PARTITION BY RANGE(durable_task_inserted_at);