Skip to content
8 changes: 8 additions & 0 deletions api-contracts/v1/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,19 @@ message DurableTaskRequest {
}
}

message DurableTaskErrorResponse {
string durable_task_external_id = 1;
int64 invocation_count = 2;
int64 node_id = 3;
string error_message = 4;
}

message DurableTaskResponse {
oneof message {
DurableTaskResponseRegisterWorker register_worker = 1;
DurableTaskEventAckResponse trigger_ack = 2;
DurableTaskCallbackCompletedResponse callback_completed = 3;
DurableTaskErrorResponse error = 4;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,9 @@ CREATE TABLE v1_durable_event_log_entry (
parent_node_id BIGINT,
-- The branch id when this event was first seen. A durable event log can be a part of many branches.
branch_id BIGINT NOT NULL,
-- Todo: Associated data for this event should be stored in the v1_payload table!
-- data JSONB,
-- The hash of the data stored in the v1_payload table to check non-determinism violations.
-- This can be null for event types that don't have associated data.
-- TODO: we can add CHECK CONSTRAINT for event types that require data_hash to be non-null.
data_hash BYTEA,
-- Can discuss: adds some flexibility for future hash algorithms
data_hash_alg TEXT,
-- An idempotency key generated from the incoming data (using the type of event + wait for conditions or the trigger event payload + options)
-- to determine whether or not there's been a non-determinism error
idempotency_key BYTEA,
-- Access patterns:
-- Definite: we'll query directly for the node_id when a durable task is replaying its log
-- Possible: we may want to query a range of node_ids for a durable task
Expand Down
45 changes: 44 additions & 1 deletion internal/services/dispatcher/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hatchet-dev/hatchet/internal/msgqueue"
contracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"
tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1"
v1 "github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
)
Expand Down Expand Up @@ -541,7 +543,47 @@ func (d *DispatcherServiceImpl) handleDurableTaskEvent(
TriggerOpts: triggerOpts,
})

if err != nil {
var nde *v1.NonDeterminismError
if err != nil && errors.As(err, &nde) {
errMsg := fmt.Sprintf("non-determinism detected for durable task event with task id %s", taskExternalId)

failMsg, failErr := tasktypes.FailedTaskMessage(
invocation.tenantId,
task.ID,
task.InsertedAt,
task.ExternalID,
task.WorkflowRunID,
task.RetryCount,
false,
errMsg,
true,
)

if failErr != nil {
return fmt.Errorf("failed to create non-determinism fail message: %w", failErr)
}

if failErr = d.mq.SendMessage(ctx, msgqueue.TASK_PROCESSING_QUEUE, failMsg); failErr != nil {
return fmt.Errorf("failed to publish non-determinism fail message: %w", failErr)
}

sendErr := invocation.send(&contracts.DurableTaskResponse{
Message: &contracts.DurableTaskResponse_Error{
Error: &contracts.DurableTaskErrorResponse{
DurableTaskExternalId: taskExternalId.String(),
NodeId: ingestionResult.NodeId,
InvocationCount: req.InvocationCount,
ErrorMessage: errMsg,
},
},
})

if sendErr != nil {
return fmt.Errorf("failed to send non-determinism error to worker: %w", sendErr)
}

return nil
} else if err != nil {
return status.Errorf(codes.Internal, "failed to ingest durable task event: %v", err)
}

Expand Down Expand Up @@ -618,6 +660,7 @@ func (d *DispatcherServiceImpl) handleWorkerStatus(
}

callbacks, err := d.repo.DurableEvents().GetSatisfiedCallbacks(ctx, invocation.tenantId, waiting)

if err != nil {
return fmt.Errorf("failed to get satisfied callbacks: %w", err)
}
Expand Down
Loading
Loading