diff --git a/api-contracts/v1/dispatcher.proto b/api-contracts/v1/dispatcher.proto index 9b5a212532..51e041ec0b 100644 --- a/api-contracts/v1/dispatcher.proto +++ b/api-contracts/v1/dispatcher.proto @@ -84,11 +84,19 @@ message DurableTaskRequest { } } +message DurableTaskErrorResponse { + string durable_task_external_id = 1; + int64 invocation_count = 2; + int64 node_id = 3; + string error_message = 4; +} + message DurableTaskResponse { oneof message { DurableTaskResponseRegisterWorker register_worker = 1; DurableTaskEventAckResponse trigger_ack = 2; DurableTaskCallbackCompletedResponse callback_completed = 3; + DurableTaskErrorResponse error = 4; } } diff --git a/cmd/hatchet-migrate/migrate/migrations/20260216174818_v1_0_80.sql b/cmd/hatchet-migrate/migrate/migrations/20260216174818_v1_0_80.sql index ba071f4d20..4d45817b98 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20260216174818_v1_0_80.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20260216174818_v1_0_80.sql @@ -55,14 +55,9 @@ CREATE TABLE v1_durable_event_log_entry ( parent_node_id BIGINT, -- The branch id when this event was first seen. A durable event log can be a part of many branches. branch_id BIGINT NOT NULL, - -- Todo: Associated data for this event should be stored in the v1_payload table! - -- data JSONB, - -- The hash of the data stored in the v1_payload table to check non-determinism violations. - -- This can be null for event types that don't have associated data. - -- TODO: we can add CHECK CONSTRAINT for event types that require data_hash to be non-null. - data_hash BYTEA, - -- Can discuss: adds some flexibility for future hash algorithms - data_hash_alg TEXT, + -- An idempotency key generated from the incoming data (using the type of event + wait for conditions or the trigger event payload + options) + -- to determine whether or not there's been a non-determinism error + idempotency_key BYTEA, -- Access patterns: -- Definite: we'll query directly for the node_id when a durable task is replaying its log -- Possible: we may want to query a range of node_ids for a durable task diff --git a/internal/services/dispatcher/v1/server.go b/internal/services/dispatcher/v1/server.go index 1c14c1906e..708481f508 100644 --- a/internal/services/dispatcher/v1/server.go +++ b/internal/services/dispatcher/v1/server.go @@ -14,7 +14,9 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/hatchet-dev/hatchet/internal/msgqueue" contracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1" + tasktypes "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes/v1" v1 "github.com/hatchet-dev/hatchet/pkg/repository" "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" ) @@ -541,7 +543,47 @@ func (d *DispatcherServiceImpl) handleDurableTaskEvent( TriggerOpts: triggerOpts, }) - if err != nil { + var nde *v1.NonDeterminismError + if err != nil && errors.As(err, &nde) { + errMsg := fmt.Sprintf("non-determinism detected for durable task event with task id %s", taskExternalId) + + failMsg, failErr := tasktypes.FailedTaskMessage( + invocation.tenantId, + task.ID, + task.InsertedAt, + task.ExternalID, + task.WorkflowRunID, + task.RetryCount, + false, + errMsg, + true, + ) + + if failErr != nil { + return fmt.Errorf("failed to create non-determinism fail message: %w", failErr) + } + + if failErr = d.mq.SendMessage(ctx, msgqueue.TASK_PROCESSING_QUEUE, failMsg); failErr != nil { + return fmt.Errorf("failed to publish non-determinism fail message: %w", failErr) + } + + sendErr := invocation.send(&contracts.DurableTaskResponse{ + Message: &contracts.DurableTaskResponse_Error{ + Error: &contracts.DurableTaskErrorResponse{ + DurableTaskExternalId: taskExternalId.String(), + NodeId: ingestionResult.NodeId, + InvocationCount: req.InvocationCount, + ErrorMessage: errMsg, + }, + }, + }) + + if sendErr != nil { + return fmt.Errorf("failed to send non-determinism error to worker: %w", sendErr) + } + + return nil + } else if err != nil { return status.Errorf(codes.Internal, "failed to ingest durable task event: %v", err) } @@ -618,6 +660,7 @@ func (d *DispatcherServiceImpl) handleWorkerStatus( } callbacks, err := d.repo.DurableEvents().GetSatisfiedCallbacks(ctx, invocation.tenantId, waiting) + if err != nil { return fmt.Errorf("failed to get satisfied callbacks: %w", err) } diff --git a/internal/services/shared/proto/v1/dispatcher.pb.go b/internal/services/shared/proto/v1/dispatcher.pb.go index d1d9dcd302..486fe674f5 100644 --- a/internal/services/shared/proto/v1/dispatcher.pb.go +++ b/internal/services/shared/proto/v1/dispatcher.pb.go @@ -675,6 +675,77 @@ func (*DurableTaskRequest_EvictInvocation) isDurableTaskRequest_Message() {} func (*DurableTaskRequest_WorkerStatus) isDurableTaskRequest_Message() {} +type DurableTaskErrorResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + DurableTaskExternalId string `protobuf:"bytes,1,opt,name=durable_task_external_id,json=durableTaskExternalId,proto3" json:"durable_task_external_id,omitempty"` + InvocationCount int64 `protobuf:"varint,2,opt,name=invocation_count,json=invocationCount,proto3" json:"invocation_count,omitempty"` + NodeId int64 `protobuf:"varint,3,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` + ErrorMessage string `protobuf:"bytes,4,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` +} + +func (x *DurableTaskErrorResponse) Reset() { + *x = DurableTaskErrorResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_v1_dispatcher_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DurableTaskErrorResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DurableTaskErrorResponse) ProtoMessage() {} + +func (x *DurableTaskErrorResponse) ProtoReflect() protoreflect.Message { + mi := &file_v1_dispatcher_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DurableTaskErrorResponse.ProtoReflect.Descriptor instead. +func (*DurableTaskErrorResponse) Descriptor() ([]byte, []int) { + return file_v1_dispatcher_proto_rawDescGZIP(), []int{9} +} + +func (x *DurableTaskErrorResponse) GetDurableTaskExternalId() string { + if x != nil { + return x.DurableTaskExternalId + } + return "" +} + +func (x *DurableTaskErrorResponse) GetInvocationCount() int64 { + if x != nil { + return x.InvocationCount + } + return 0 +} + +func (x *DurableTaskErrorResponse) GetNodeId() int64 { + if x != nil { + return x.NodeId + } + return 0 +} + +func (x *DurableTaskErrorResponse) GetErrorMessage() string { + if x != nil { + return x.ErrorMessage + } + return "" +} + type DurableTaskResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -685,13 +756,14 @@ type DurableTaskResponse struct { // *DurableTaskResponse_RegisterWorker // *DurableTaskResponse_TriggerAck // *DurableTaskResponse_CallbackCompleted + // *DurableTaskResponse_Error Message isDurableTaskResponse_Message `protobuf_oneof:"message"` } func (x *DurableTaskResponse) Reset() { *x = DurableTaskResponse{} if protoimpl.UnsafeEnabled { - mi := &file_v1_dispatcher_proto_msgTypes[9] + mi := &file_v1_dispatcher_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -704,7 +776,7 @@ func (x *DurableTaskResponse) String() string { func (*DurableTaskResponse) ProtoMessage() {} func (x *DurableTaskResponse) ProtoReflect() protoreflect.Message { - mi := &file_v1_dispatcher_proto_msgTypes[9] + mi := &file_v1_dispatcher_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -717,7 +789,7 @@ func (x *DurableTaskResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use DurableTaskResponse.ProtoReflect.Descriptor instead. func (*DurableTaskResponse) Descriptor() ([]byte, []int) { - return file_v1_dispatcher_proto_rawDescGZIP(), []int{9} + return file_v1_dispatcher_proto_rawDescGZIP(), []int{10} } func (m *DurableTaskResponse) GetMessage() isDurableTaskResponse_Message { @@ -748,6 +820,13 @@ func (x *DurableTaskResponse) GetCallbackCompleted() *DurableTaskCallbackComplet return nil } +func (x *DurableTaskResponse) GetError() *DurableTaskErrorResponse { + if x, ok := x.GetMessage().(*DurableTaskResponse_Error); ok { + return x.Error + } + return nil +} + type isDurableTaskResponse_Message interface { isDurableTaskResponse_Message() } @@ -764,12 +843,18 @@ type DurableTaskResponse_CallbackCompleted struct { CallbackCompleted *DurableTaskCallbackCompletedResponse `protobuf:"bytes,3,opt,name=callback_completed,json=callbackCompleted,proto3,oneof"` } +type DurableTaskResponse_Error struct { + Error *DurableTaskErrorResponse `protobuf:"bytes,4,opt,name=error,proto3,oneof"` +} + func (*DurableTaskResponse_RegisterWorker) isDurableTaskResponse_Message() {} func (*DurableTaskResponse_TriggerAck) isDurableTaskResponse_Message() {} func (*DurableTaskResponse_CallbackCompleted) isDurableTaskResponse_Message() {} +func (*DurableTaskResponse_Error) isDurableTaskResponse_Message() {} + type RegisterDurableEventRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -783,7 +868,7 @@ type RegisterDurableEventRequest struct { func (x *RegisterDurableEventRequest) Reset() { *x = RegisterDurableEventRequest{} if protoimpl.UnsafeEnabled { - mi := &file_v1_dispatcher_proto_msgTypes[10] + mi := &file_v1_dispatcher_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -796,7 +881,7 @@ func (x *RegisterDurableEventRequest) String() string { func (*RegisterDurableEventRequest) ProtoMessage() {} func (x *RegisterDurableEventRequest) ProtoReflect() protoreflect.Message { - mi := &file_v1_dispatcher_proto_msgTypes[10] + mi := &file_v1_dispatcher_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -809,7 +894,7 @@ func (x *RegisterDurableEventRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterDurableEventRequest.ProtoReflect.Descriptor instead. func (*RegisterDurableEventRequest) Descriptor() ([]byte, []int) { - return file_v1_dispatcher_proto_rawDescGZIP(), []int{10} + return file_v1_dispatcher_proto_rawDescGZIP(), []int{11} } func (x *RegisterDurableEventRequest) GetTaskId() string { @@ -842,7 +927,7 @@ type RegisterDurableEventResponse struct { func (x *RegisterDurableEventResponse) Reset() { *x = RegisterDurableEventResponse{} if protoimpl.UnsafeEnabled { - mi := &file_v1_dispatcher_proto_msgTypes[11] + mi := &file_v1_dispatcher_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -855,7 +940,7 @@ func (x *RegisterDurableEventResponse) String() string { func (*RegisterDurableEventResponse) ProtoMessage() {} func (x *RegisterDurableEventResponse) ProtoReflect() protoreflect.Message { - mi := &file_v1_dispatcher_proto_msgTypes[11] + mi := &file_v1_dispatcher_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -868,7 +953,7 @@ func (x *RegisterDurableEventResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterDurableEventResponse.ProtoReflect.Descriptor instead. func (*RegisterDurableEventResponse) Descriptor() ([]byte, []int) { - return file_v1_dispatcher_proto_rawDescGZIP(), []int{11} + return file_v1_dispatcher_proto_rawDescGZIP(), []int{12} } type ListenForDurableEventRequest struct { @@ -883,7 +968,7 @@ type ListenForDurableEventRequest struct { func (x *ListenForDurableEventRequest) Reset() { *x = ListenForDurableEventRequest{} if protoimpl.UnsafeEnabled { - mi := &file_v1_dispatcher_proto_msgTypes[12] + mi := &file_v1_dispatcher_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -896,7 +981,7 @@ func (x *ListenForDurableEventRequest) String() string { func (*ListenForDurableEventRequest) ProtoMessage() {} func (x *ListenForDurableEventRequest) ProtoReflect() protoreflect.Message { - mi := &file_v1_dispatcher_proto_msgTypes[12] + mi := &file_v1_dispatcher_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -909,7 +994,7 @@ func (x *ListenForDurableEventRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ListenForDurableEventRequest.ProtoReflect.Descriptor instead. func (*ListenForDurableEventRequest) Descriptor() ([]byte, []int) { - return file_v1_dispatcher_proto_rawDescGZIP(), []int{12} + return file_v1_dispatcher_proto_rawDescGZIP(), []int{13} } func (x *ListenForDurableEventRequest) GetTaskId() string { @@ -939,7 +1024,7 @@ type DurableEvent struct { func (x *DurableEvent) Reset() { *x = DurableEvent{} if protoimpl.UnsafeEnabled { - mi := &file_v1_dispatcher_proto_msgTypes[13] + mi := &file_v1_dispatcher_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -952,7 +1037,7 @@ func (x *DurableEvent) String() string { func (*DurableEvent) ProtoMessage() {} func (x *DurableEvent) ProtoReflect() protoreflect.Message { - mi := &file_v1_dispatcher_proto_msgTypes[13] + mi := &file_v1_dispatcher_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -965,7 +1050,7 @@ func (x *DurableEvent) ProtoReflect() protoreflect.Message { // Deprecated: Use DurableEvent.ProtoReflect.Descriptor instead. func (*DurableEvent) Descriptor() ([]byte, []int) { - return file_v1_dispatcher_proto_rawDescGZIP(), []int{13} + return file_v1_dispatcher_proto_rawDescGZIP(), []int{14} } func (x *DurableEvent) GetTaskId() string { @@ -1098,79 +1183,95 @@ var file_v1_dispatcher_proto_rawDesc = []byte{ 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0c, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x91, 0x02, 0x0a, 0x13, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, - 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x50, 0x0a, - 0x0f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, - 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, - 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x48, 0x00, 0x52, - 0x0e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x12, - 0x42, 0x0a, 0x0b, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x61, 0x63, 0x6b, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, - 0x65, 0x54, 0x61, 0x73, 0x6b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x0a, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, - 0x41, 0x63, 0x6b, 0x12, 0x59, 0x0a, 0x12, 0x63, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x5f, - 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x28, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, - 0x43, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, - 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x11, 0x63, 0x61, 0x6c, - 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x42, 0x09, - 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x99, 0x01, 0x0a, 0x1b, 0x52, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xbc, 0x01, 0x0a, 0x18, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, + 0x65, 0x54, 0x61, 0x73, 0x6b, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x37, 0x0a, 0x18, 0x64, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x74, 0x61, + 0x73, 0x6b, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x64, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, + 0x6b, 0x45, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x10, 0x69, + 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0f, 0x69, 0x6e, 0x76, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x69, + 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x64, 0x12, + 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x22, 0xc7, 0x02, 0x0a, 0x13, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, + 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x50, 0x0a, 0x0f, + 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, + 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x48, 0x00, 0x52, 0x0e, + 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x12, 0x42, + 0x0a, 0x0b, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x5f, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, + 0x54, 0x61, 0x73, 0x6b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x0a, 0x74, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x41, + 0x63, 0x6b, 0x12, 0x59, 0x0a, 0x12, 0x63, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x5f, 0x63, + 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, + 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x43, + 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x11, 0x63, 0x61, 0x6c, 0x6c, + 0x62, 0x61, 0x63, 0x6b, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x34, 0x0a, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x76, + 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x45, 0x72, 0x72, + 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x99, + 0x01, 0x0a, 0x1b, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, + 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, + 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, + 0x6c, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x69, 0x67, + 0x6e, 0x61, 0x6c, 0x4b, 0x65, 0x79, 0x12, 0x42, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x76, 0x31, 0x2e, + 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4c, 0x69, 0x73, 0x74, + 0x65, 0x6e, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0a, + 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x1e, 0x0a, 0x1c, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, - 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, - 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x5f, 0x6b, 0x65, 0x79, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x4b, 0x65, - 0x79, 0x12, 0x42, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, - 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x65, 0x72, 0x43, - 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x64, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x1e, 0x0a, 0x1c, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x56, 0x0a, 0x1c, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x46, - 0x6f, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1d, - 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x4b, 0x65, 0x79, 0x22, 0x5a, 0x0a, - 0x0c, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17, 0x0a, - 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, - 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, - 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, - 0x61, 0x6c, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0xb0, 0x01, 0x0a, 0x14, 0x44, 0x75, - 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4b, 0x69, - 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x25, 0x44, 0x55, 0x52, 0x41, 0x42, 0x4c, 0x45, 0x5f, 0x54, 0x41, + 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x56, 0x0a, 0x1c, 0x4c, 0x69, + 0x73, 0x74, 0x65, 0x6e, 0x46, 0x6f, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, + 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, + 0x6b, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x5f, 0x6b, 0x65, + 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x4b, + 0x65, 0x79, 0x22, 0x5a, 0x0a, 0x0c, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0xb0, + 0x01, 0x0a, 0x14, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x25, 0x44, 0x55, 0x52, 0x41, 0x42, + 0x4c, 0x45, 0x5f, 0x54, 0x41, 0x53, 0x4b, 0x5f, 0x54, 0x52, 0x49, 0x47, 0x47, 0x45, 0x52, 0x5f, + 0x4b, 0x49, 0x4e, 0x44, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, + 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x44, 0x55, 0x52, 0x41, 0x42, 0x4c, 0x45, 0x5f, 0x54, 0x41, 0x53, 0x4b, 0x5f, 0x54, 0x52, 0x49, 0x47, 0x47, 0x45, 0x52, 0x5f, 0x4b, 0x49, 0x4e, 0x44, 0x5f, - 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x21, 0x0a, - 0x1d, 0x44, 0x55, 0x52, 0x41, 0x42, 0x4c, 0x45, 0x5f, 0x54, 0x41, 0x53, 0x4b, 0x5f, 0x54, 0x52, - 0x49, 0x47, 0x47, 0x45, 0x52, 0x5f, 0x4b, 0x49, 0x4e, 0x44, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x01, - 0x12, 0x26, 0x0a, 0x22, 0x44, 0x55, 0x52, 0x41, 0x42, 0x4c, 0x45, 0x5f, 0x54, 0x41, 0x53, 0x4b, - 0x5f, 0x54, 0x52, 0x49, 0x47, 0x47, 0x45, 0x52, 0x5f, 0x4b, 0x49, 0x4e, 0x44, 0x5f, 0x57, 0x41, - 0x49, 0x54, 0x5f, 0x46, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x22, 0x0a, 0x1e, 0x44, 0x55, 0x52, 0x41, - 0x42, 0x4c, 0x45, 0x5f, 0x54, 0x41, 0x53, 0x4b, 0x5f, 0x54, 0x52, 0x49, 0x47, 0x47, 0x45, 0x52, - 0x5f, 0x4b, 0x49, 0x4e, 0x44, 0x5f, 0x4d, 0x45, 0x4d, 0x4f, 0x10, 0x03, 0x32, 0x84, 0x02, 0x0a, - 0x0c, 0x56, 0x31, 0x44, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x12, 0x44, 0x0a, - 0x0b, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x16, 0x2e, 0x76, - 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, - 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, - 0x01, 0x30, 0x01, 0x12, 0x5b, 0x0a, 0x14, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x44, - 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1f, 0x2e, 0x76, 0x31, - 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x76, - 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, - 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x51, 0x0a, 0x15, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x46, 0x6f, 0x72, 0x44, 0x75, 0x72, - 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x20, 0x2e, 0x76, 0x31, 0x2e, 0x4c, - 0x69, 0x73, 0x74, 0x65, 0x6e, 0x46, 0x6f, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x76, 0x31, - 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x28, - 0x01, 0x30, 0x01, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x68, 0x61, - 0x74, 0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x73, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x55, 0x4e, 0x10, 0x01, 0x12, 0x26, 0x0a, 0x22, 0x44, 0x55, 0x52, 0x41, 0x42, 0x4c, 0x45, + 0x5f, 0x54, 0x41, 0x53, 0x4b, 0x5f, 0x54, 0x52, 0x49, 0x47, 0x47, 0x45, 0x52, 0x5f, 0x4b, 0x49, + 0x4e, 0x44, 0x5f, 0x57, 0x41, 0x49, 0x54, 0x5f, 0x46, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x22, 0x0a, + 0x1e, 0x44, 0x55, 0x52, 0x41, 0x42, 0x4c, 0x45, 0x5f, 0x54, 0x41, 0x53, 0x4b, 0x5f, 0x54, 0x52, + 0x49, 0x47, 0x47, 0x45, 0x52, 0x5f, 0x4b, 0x49, 0x4e, 0x44, 0x5f, 0x4d, 0x45, 0x4d, 0x4f, 0x10, + 0x03, 0x32, 0x84, 0x02, 0x0a, 0x0c, 0x56, 0x31, 0x44, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, + 0x65, 0x72, 0x12, 0x44, 0x0a, 0x0b, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, + 0x6b, 0x12, 0x16, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, + 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x76, 0x31, 0x2e, 0x44, + 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5b, 0x0a, 0x14, 0x52, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x12, 0x1f, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x44, 0x75, + 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x20, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x44, + 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x15, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x46, + 0x6f, 0x72, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x20, + 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x46, 0x6f, 0x72, 0x44, 0x75, 0x72, + 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x10, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64, + 0x65, 0x76, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x73, 0x68, 0x61, + 0x72, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1186,7 +1287,7 @@ func file_v1_dispatcher_proto_rawDescGZIP() []byte { } var file_v1_dispatcher_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_v1_dispatcher_proto_msgTypes = make([]protoimpl.MessageInfo, 14) +var file_v1_dispatcher_proto_msgTypes = make([]protoimpl.MessageInfo, 15) var file_v1_dispatcher_proto_goTypes = []interface{}{ (DurableTaskEventKind)(0), // 0: v1.DurableTaskEventKind (*DurableTaskRequestRegisterWorker)(nil), // 1: v1.DurableTaskRequestRegisterWorker @@ -1198,18 +1299,19 @@ var file_v1_dispatcher_proto_goTypes = []interface{}{ (*DurableTaskAwaitedCallback)(nil), // 7: v1.DurableTaskAwaitedCallback (*DurableTaskWorkerStatusRequest)(nil), // 8: v1.DurableTaskWorkerStatusRequest (*DurableTaskRequest)(nil), // 9: v1.DurableTaskRequest - (*DurableTaskResponse)(nil), // 10: v1.DurableTaskResponse - (*RegisterDurableEventRequest)(nil), // 11: v1.RegisterDurableEventRequest - (*RegisterDurableEventResponse)(nil), // 12: v1.RegisterDurableEventResponse - (*ListenForDurableEventRequest)(nil), // 13: v1.ListenForDurableEventRequest - (*DurableEvent)(nil), // 14: v1.DurableEvent - (*DurableEventListenerConditions)(nil), // 15: v1.DurableEventListenerConditions - (*TriggerWorkflowRequest)(nil), // 16: v1.TriggerWorkflowRequest + (*DurableTaskErrorResponse)(nil), // 10: v1.DurableTaskErrorResponse + (*DurableTaskResponse)(nil), // 11: v1.DurableTaskResponse + (*RegisterDurableEventRequest)(nil), // 12: v1.RegisterDurableEventRequest + (*RegisterDurableEventResponse)(nil), // 13: v1.RegisterDurableEventResponse + (*ListenForDurableEventRequest)(nil), // 14: v1.ListenForDurableEventRequest + (*DurableEvent)(nil), // 15: v1.DurableEvent + (*DurableEventListenerConditions)(nil), // 16: v1.DurableEventListenerConditions + (*TriggerWorkflowRequest)(nil), // 17: v1.TriggerWorkflowRequest } var file_v1_dispatcher_proto_depIdxs = []int32{ 0, // 0: v1.DurableTaskEventRequest.kind:type_name -> v1.DurableTaskEventKind - 15, // 1: v1.DurableTaskEventRequest.wait_for_conditions:type_name -> v1.DurableEventListenerConditions - 16, // 2: v1.DurableTaskEventRequest.trigger_opts:type_name -> v1.TriggerWorkflowRequest + 16, // 1: v1.DurableTaskEventRequest.wait_for_conditions:type_name -> v1.DurableEventListenerConditions + 17, // 2: v1.DurableTaskEventRequest.trigger_opts:type_name -> v1.TriggerWorkflowRequest 7, // 3: v1.DurableTaskWorkerStatusRequest.waiting_callbacks:type_name -> v1.DurableTaskAwaitedCallback 1, // 4: v1.DurableTaskRequest.register_worker:type_name -> v1.DurableTaskRequestRegisterWorker 3, // 5: v1.DurableTaskRequest.event:type_name -> v1.DurableTaskEventRequest @@ -1218,18 +1320,19 @@ var file_v1_dispatcher_proto_depIdxs = []int32{ 2, // 8: v1.DurableTaskResponse.register_worker:type_name -> v1.DurableTaskResponseRegisterWorker 4, // 9: v1.DurableTaskResponse.trigger_ack:type_name -> v1.DurableTaskEventAckResponse 5, // 10: v1.DurableTaskResponse.callback_completed:type_name -> v1.DurableTaskCallbackCompletedResponse - 15, // 11: v1.RegisterDurableEventRequest.conditions:type_name -> v1.DurableEventListenerConditions - 9, // 12: v1.V1Dispatcher.DurableTask:input_type -> v1.DurableTaskRequest - 11, // 13: v1.V1Dispatcher.RegisterDurableEvent:input_type -> v1.RegisterDurableEventRequest - 13, // 14: v1.V1Dispatcher.ListenForDurableEvent:input_type -> v1.ListenForDurableEventRequest - 10, // 15: v1.V1Dispatcher.DurableTask:output_type -> v1.DurableTaskResponse - 12, // 16: v1.V1Dispatcher.RegisterDurableEvent:output_type -> v1.RegisterDurableEventResponse - 14, // 17: v1.V1Dispatcher.ListenForDurableEvent:output_type -> v1.DurableEvent - 15, // [15:18] is the sub-list for method output_type - 12, // [12:15] is the sub-list for method input_type - 12, // [12:12] is the sub-list for extension type_name - 12, // [12:12] is the sub-list for extension extendee - 0, // [0:12] is the sub-list for field type_name + 10, // 11: v1.DurableTaskResponse.error:type_name -> v1.DurableTaskErrorResponse + 16, // 12: v1.RegisterDurableEventRequest.conditions:type_name -> v1.DurableEventListenerConditions + 9, // 13: v1.V1Dispatcher.DurableTask:input_type -> v1.DurableTaskRequest + 12, // 14: v1.V1Dispatcher.RegisterDurableEvent:input_type -> v1.RegisterDurableEventRequest + 14, // 15: v1.V1Dispatcher.ListenForDurableEvent:input_type -> v1.ListenForDurableEventRequest + 11, // 16: v1.V1Dispatcher.DurableTask:output_type -> v1.DurableTaskResponse + 13, // 17: v1.V1Dispatcher.RegisterDurableEvent:output_type -> v1.RegisterDurableEventResponse + 15, // 18: v1.V1Dispatcher.ListenForDurableEvent:output_type -> v1.DurableEvent + 16, // [16:19] is the sub-list for method output_type + 13, // [13:16] is the sub-list for method input_type + 13, // [13:13] is the sub-list for extension type_name + 13, // [13:13] is the sub-list for extension extendee + 0, // [0:13] is the sub-list for field type_name } func init() { file_v1_dispatcher_proto_init() } @@ -1349,7 +1452,7 @@ func file_v1_dispatcher_proto_init() { } } file_v1_dispatcher_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DurableTaskResponse); i { + switch v := v.(*DurableTaskErrorResponse); i { case 0: return &v.state case 1: @@ -1361,7 +1464,7 @@ func file_v1_dispatcher_proto_init() { } } file_v1_dispatcher_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RegisterDurableEventRequest); i { + switch v := v.(*DurableTaskResponse); i { case 0: return &v.state case 1: @@ -1373,7 +1476,7 @@ func file_v1_dispatcher_proto_init() { } } file_v1_dispatcher_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RegisterDurableEventResponse); i { + switch v := v.(*RegisterDurableEventRequest); i { case 0: return &v.state case 1: @@ -1385,7 +1488,7 @@ func file_v1_dispatcher_proto_init() { } } file_v1_dispatcher_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListenForDurableEventRequest); i { + switch v := v.(*RegisterDurableEventResponse); i { case 0: return &v.state case 1: @@ -1397,6 +1500,18 @@ func file_v1_dispatcher_proto_init() { } } file_v1_dispatcher_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListenForDurableEventRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_v1_dispatcher_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DurableEvent); i { case 0: return &v.state @@ -1416,10 +1531,11 @@ func file_v1_dispatcher_proto_init() { (*DurableTaskRequest_EvictInvocation)(nil), (*DurableTaskRequest_WorkerStatus)(nil), } - file_v1_dispatcher_proto_msgTypes[9].OneofWrappers = []interface{}{ + file_v1_dispatcher_proto_msgTypes[10].OneofWrappers = []interface{}{ (*DurableTaskResponse_RegisterWorker)(nil), (*DurableTaskResponse_TriggerAck)(nil), (*DurableTaskResponse_CallbackCompleted)(nil), + (*DurableTaskResponse_Error)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -1427,7 +1543,7 @@ func file_v1_dispatcher_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_v1_dispatcher_proto_rawDesc, NumEnums: 1, - NumMessages: 14, + NumMessages: 15, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/repository/durable_events.go b/pkg/repository/durable_events.go index d33e50cf93..2b5fbfe91b 100644 --- a/pkg/repository/durable_events.go +++ b/pkg/repository/durable_events.go @@ -1,7 +1,11 @@ package repository import ( + "bytes" "context" + "crypto/sha1" + "encoding/hex" + "encoding/json" "errors" "fmt" "time" @@ -74,6 +78,12 @@ func newDurableEventsRepository(shared *sharedRepository) DurableEventsRepositor } } +type NonDeterminismError struct{} + +func (m *NonDeterminismError) Error() string { + return "non-determinism detected for durable event log entry" +} + func (r *durableEventsRepository) getOrCreateEventLogEntry( ctx context.Context, tx sqlcv1.DBTX, @@ -101,8 +111,7 @@ func (r *durableEventsRepository) getOrCreateEventLogEntry( Nodeid: params.Nodeid, ParentNodeId: params.ParentNodeId, Branchid: params.Branchid, - Datahash: params.Datahash, - Datahashalg: params.Datahashalg, + Idempotencykey: params.Idempotencykey, }) if err != nil { @@ -122,6 +131,13 @@ func (r *durableEventsRepository) getOrCreateEventLogEntry( return nil, err } } + } else { + incomingIdempotencyKey := params.Idempotencykey + existingIdempotencyKey := entry.IdempotencyKey + + if !bytes.Equal(incomingIdempotencyKey, existingIdempotencyKey) { + return nil, &NonDeterminismError{} + } } return &EventLogEntryWithPayload{Entry: entry, Payload: payload, AlreadyExisted: alreadyExisted}, nil @@ -221,20 +237,35 @@ func (r *durableEventsRepository) GetSatisfiedCallbacks(ctx context.Context, ten return nil, fmt.Errorf("failed to list satisfied callbacks: %w", err) } + retrievePayloadOpts := make([]RetrievePayloadOpts, len(rows)) + + for i, row := range rows { + retrievePayloadOpts[i] = RetrievePayloadOpts{ + Id: row.ID, + InsertedAt: row.InsertedAt, + Type: sqlcv1.V1PayloadTypeDURABLEEVENTLOGCALLBACKRESULTDATA, + TenantId: tenantId, + } + } + + payloads, err := r.payloadStore.Retrieve(ctx, r.pool, retrievePayloadOpts...) + + if err != nil { + return nil, fmt.Errorf("failed to retrieve payloads for satisfied callbacks: %w", err) + } + result := make([]*SatisfiedCallbackWithPayload, 0, len(rows)) for _, row := range rows { - payload, err := r.payloadStore.RetrieveSingle(ctx, r.pool, RetrievePayloadOpts{ + retrieveOpt := RetrievePayloadOpts{ Id: row.ID, InsertedAt: row.InsertedAt, Type: sqlcv1.V1PayloadTypeDURABLEEVENTLOGCALLBACKRESULTDATA, TenantId: tenantId, - }) - if err != nil { - r.l.Warn().Err(err).Msgf("failed to retrieve payload for callback %d", row.NodeID) - payload = nil } + payload := payloads[retrieveOpt] + result = append(result, &SatisfiedCallbackWithPayload{ TaskExternalId: row.TaskExternalID, NodeID: row.NodeID, @@ -249,6 +280,42 @@ func getDurableTaskSignalKey(taskExternalId uuid.UUID, nodeId int64) string { return fmt.Sprintf("durable:%s:%d", taskExternalId.String(), nodeId) } +func (r *durableEventsRepository) createIdempotencyKey(ctx context.Context, opts IngestDurableTaskEventOpts) ([]byte, error) { + kindBytes := []byte(opts.Kind) + + // todo: be more intentional about how we construct this key (e.g. do we want to marshal all of the opts?) + var triggerOptBytes []byte + var conditionBytes []byte + var err error + + if opts.TriggerOpts != nil { + triggerOptBytes, err = json.Marshal(opts.TriggerOpts) + + if err != nil { + return nil, fmt.Errorf("failed to marshal trigger opts for idempotency key generation: %w", err) + } + } + + if opts.WaitForConditions != nil { + conditionBytes, err = json.Marshal(opts.WaitForConditions) + + if err != nil { + return nil, fmt.Errorf("failed to marshal wait for conditions for idempotency key generation: %w", err) + } + } + + dataToHash := append(kindBytes, triggerOptBytes...) + dataToHash = append(dataToHash, conditionBytes...) + + h := sha1.New() + h.Write(dataToHash) + hashBytes := h.Sum(nil) + idempotencyKey := make([]byte, hex.EncodedLen(len(hashBytes))) + hex.Encode(idempotencyKey, hashBytes) + + return idempotencyKey, nil +} + func (r *durableEventsRepository) IngestDurableTaskEvent(ctx context.Context, opts IngestDurableTaskEventOpts) (*IngestDurableTaskEventResult, error) { if err := r.v.Validate(opts); err != nil { return nil, fmt.Errorf("invalid opts: %w", err) @@ -322,6 +389,12 @@ func (r *durableEventsRepository) IngestDurableTaskEvent(ctx context.Context, op // todo: real branching logic here branchId := logFile.LatestBranchID + idempotencyKey, err := r.createIdempotencyKey(ctx, opts) + + if err != nil { + return nil, fmt.Errorf("failed to create idempotency key: %w", err) + } + logEntry, err := r.getOrCreateEventLogEntry(ctx, tx, opts.TenantId, sqlcv1.CreateDurableEventLogEntryParams{ Tenantid: opts.TenantId, Externalid: uuid.New(), @@ -331,11 +404,13 @@ func (r *durableEventsRepository) IngestDurableTaskEvent(ctx context.Context, op Nodeid: nodeId, ParentNodeId: parentNodeId, Branchid: branchId, - Datahash: nil, // todo: implement this for nondeterminism check - Datahashalg: "", + Idempotencykey: idempotencyKey, }, opts.Payload) - if err != nil { + var nde *NonDeterminismError + if err != nil && errors.As(err, &nde) { + return nil, fmt.Errorf("non-determinism detected for durable event log entry with durable task id %s, node id %d: %w", task.ExternalID, nodeId, err) + } else if err != nil { return nil, fmt.Errorf("failed to get or create event log entry: %w", err) } diff --git a/pkg/repository/sqlcv1/durable_event_log.sql b/pkg/repository/sqlcv1/durable_event_log.sql index 4e977ee57a..fd08b1824d 100644 --- a/pkg/repository/sqlcv1/durable_event_log.sql +++ b/pkg/repository/sqlcv1/durable_event_log.sql @@ -60,8 +60,7 @@ INSERT INTO v1_durable_event_log_entry ( node_id, parent_node_id, branch_id, - data_hash, - data_hash_alg + idempotency_key ) VALUES ( @tenantId::UUID, @@ -73,8 +72,7 @@ VALUES ( @nodeId::BIGINT, sqlc.narg('parentNodeId')::BIGINT, @branchId::BIGINT, - @dataHash::BYTEA, - @dataHashAlg::TEXT + @idempotencyKey::BYTEA ) ON CONFLICT (durable_task_id, durable_task_inserted_at, node_id) DO NOTHING RETURNING * diff --git a/pkg/repository/sqlcv1/durable_event_log.sql.go b/pkg/repository/sqlcv1/durable_event_log.sql.go index 97805907a0..33d3c160c6 100644 --- a/pkg/repository/sqlcv1/durable_event_log.sql.go +++ b/pkg/repository/sqlcv1/durable_event_log.sql.go @@ -85,8 +85,7 @@ INSERT INTO v1_durable_event_log_entry ( node_id, parent_node_id, branch_id, - data_hash, - data_hash_alg + idempotency_key ) VALUES ( $1::UUID, @@ -98,11 +97,10 @@ VALUES ( $6::BIGINT, $7::BIGINT, $8::BIGINT, - $9::BYTEA, - $10::TEXT + $9::BYTEA ) ON CONFLICT (durable_task_id, durable_task_inserted_at, node_id) DO NOTHING -RETURNING tenant_id, external_id, inserted_at, id, durable_task_id, durable_task_inserted_at, kind, node_id, parent_node_id, branch_id, data_hash, data_hash_alg +RETURNING tenant_id, external_id, inserted_at, id, durable_task_id, durable_task_inserted_at, kind, node_id, parent_node_id, branch_id, idempotency_key ` type CreateDurableEventLogEntryParams struct { @@ -114,8 +112,7 @@ type CreateDurableEventLogEntryParams struct { Nodeid int64 `json:"nodeid"` ParentNodeId pgtype.Int8 `json:"parentNodeId"` Branchid int64 `json:"branchid"` - Datahash []byte `json:"datahash"` - Datahashalg string `json:"datahashalg"` + Idempotencykey []byte `json:"idempotencykey"` } func (q *Queries) CreateDurableEventLogEntry(ctx context.Context, db DBTX, arg CreateDurableEventLogEntryParams) (*V1DurableEventLogEntry, error) { @@ -128,8 +125,7 @@ func (q *Queries) CreateDurableEventLogEntry(ctx context.Context, db DBTX, arg C arg.Nodeid, arg.ParentNodeId, arg.Branchid, - arg.Datahash, - arg.Datahashalg, + arg.Idempotencykey, ) var i V1DurableEventLogEntry err := row.Scan( @@ -143,8 +139,7 @@ func (q *Queries) CreateDurableEventLogEntry(ctx context.Context, db DBTX, arg C &i.NodeID, &i.ParentNodeID, &i.BranchID, - &i.DataHash, - &i.DataHashAlg, + &i.IdempotencyKey, ) return &i, err } @@ -260,7 +255,7 @@ func (q *Queries) GetDurableEventLogCallback(ctx context.Context, db DBTX, arg G } const getDurableEventLogEntry = `-- name: GetDurableEventLogEntry :one -SELECT tenant_id, external_id, inserted_at, id, durable_task_id, durable_task_inserted_at, kind, node_id, parent_node_id, branch_id, data_hash, data_hash_alg +SELECT tenant_id, external_id, inserted_at, id, durable_task_id, durable_task_inserted_at, kind, node_id, parent_node_id, branch_id, idempotency_key FROM v1_durable_event_log_entry WHERE durable_task_id = $1::BIGINT AND durable_task_inserted_at = $2::TIMESTAMPTZ @@ -287,8 +282,7 @@ func (q *Queries) GetDurableEventLogEntry(ctx context.Context, db DBTX, arg GetD &i.NodeID, &i.ParentNodeID, &i.BranchID, - &i.DataHash, - &i.DataHashAlg, + &i.IdempotencyKey, ) return &i, err } diff --git a/pkg/repository/sqlcv1/models.go b/pkg/repository/sqlcv1/models.go index 01a6e3bf7c..408c58e4cc 100644 --- a/pkg/repository/sqlcv1/models.go +++ b/pkg/repository/sqlcv1/models.go @@ -3085,8 +3085,7 @@ type V1DurableEventLogEntry struct { NodeID int64 `json:"node_id"` ParentNodeID pgtype.Int8 `json:"parent_node_id"` BranchID int64 `json:"branch_id"` - DataHash []byte `json:"data_hash"` - DataHashAlg pgtype.Text `json:"data_hash_alg"` + IdempotencyKey []byte `json:"idempotency_key"` } type V1DurableEventLogFile struct { diff --git a/sdks/python/hatchet_sdk/cancellation.py b/sdks/python/hatchet_sdk/cancellation.py index 5998677695..6a43e36fc1 100644 --- a/sdks/python/hatchet_sdk/cancellation.py +++ b/sdks/python/hatchet_sdk/cancellation.py @@ -75,17 +75,8 @@ def cancel( """ with self._lock: if self._cancelled: - logger.debug( - f"CancellationToken: cancel() called but already cancelled, " - f"reason={self._reason.value if self._reason else 'none'}" - ) return - logger.debug( - f"CancellationToken: cancel() called, reason={reason.value}, " - f"{len(self._child_run_ids)} children registered" - ) - self._cancelled = True self._reason = reason @@ -99,13 +90,10 @@ def cancel( for callback in callbacks: try: - logger.debug(f"CancellationToken: invoking callback {callback}") callback() except Exception as e: # noqa: PERF203 logger.warning(f"CancellationToken: callback raised exception: {e}") - logger.debug(f"CancellationToken: cancel() complete, reason={reason.value}") - @property def is_cancelled(self) -> bool: """Check if cancellation has been triggered.""" @@ -123,10 +111,6 @@ async def aio_wait(self) -> None: This will block until cancel() is called. """ await self._get_async_event().wait() - logger.debug( - f"CancellationToken: async wait completed (cancelled), " - f"reason={self._reason.value if self._reason else 'none'}" - ) def wait(self, timeout: float | None = None) -> bool: """ @@ -138,13 +122,7 @@ def wait(self, timeout: float | None = None) -> bool: Returns: True if the token was cancelled (event was set), False if timeout expired. """ - result = self._sync_event.wait(timeout) - if result: - logger.debug( - f"CancellationToken: sync wait interrupted by cancellation, " - f"reason={self._reason.value if self._reason else 'none'}" - ) - return result + return self._sync_event.wait(timeout) def register_child(self, run_id: str) -> None: """ @@ -157,7 +135,6 @@ def register_child(self, run_id: str) -> None: run_id: The workflow run ID of the child workflow. """ with self._lock: - logger.debug(f"CancellationToken: registering child workflow {run_id}") self._child_run_ids.append(run_id) @property @@ -182,9 +159,6 @@ def add_callback(self, callback: Callable[[], None]) -> None: self._callbacks.append(callback) if invoke_now: - logger.debug( - f"CancellationToken: invoking callback immediately (already cancelled): {callback}" - ) try: callback() except Exception as e: diff --git a/sdks/python/hatchet_sdk/clients/listeners/durable_event_listener.py b/sdks/python/hatchet_sdk/clients/listeners/durable_event_listener.py index badf09b1b3..20802035c3 100644 --- a/sdks/python/hatchet_sdk/clients/listeners/durable_event_listener.py +++ b/sdks/python/hatchet_sdk/clients/listeners/durable_event_listener.py @@ -173,10 +173,10 @@ async def _receive_loop(self) -> None: except grpc.aio.AioRpcError as e: if e.code() != grpc.StatusCode.CANCELLED: logger.error( - f"DurableTask stream error: code={e.code()}, details={e.details()}" + f"durable stream error: code={e.code()}, details={e.details()}" ) except asyncio.CancelledError: - logger.debug("Receive loop cancelled") + pass except Exception as e: logger.exception(f"Unexpected error in receive loop: {e}") @@ -211,6 +211,16 @@ async def _handle_response(self, response: DurableTaskResponse) -> None: if not future.done(): future.set_result(DurableTaskCallbackResult.from_proto(completed)) del self._pending_callbacks[completed_key] + elif response.HasField("error"): + error = response.error + logger.exception( + f"durable task error: {error.error_message} " + f"(task={error.durable_task_external_id}, invocation={error.invocation_count})" + ) + event_key = (error.durable_task_external_id, error.invocation_count) + if event_key in self._pending_event_acks: + self._pending_event_acks[event_key].cancel() + del self._pending_event_acks[event_key] async def _register_worker(self) -> None: if self._request_queue is None or self._worker_id is None: diff --git a/sdks/python/hatchet_sdk/clients/listeners/pooled_listener.py b/sdks/python/hatchet_sdk/clients/listeners/pooled_listener.py index 1d05ba77a5..7e15d44b8b 100644 --- a/sdks/python/hatchet_sdk/clients/listeners/pooled_listener.py +++ b/sdks/python/hatchet_sdk/clients/listeners/pooled_listener.py @@ -247,7 +247,6 @@ async def subscribe( return await race_against_token(result_task, cancellation_token) return await self.events[subscription_id].get() except asyncio.CancelledError: - logger.debug(f"PooledListener.subscribe: externally cancelled for id={id}") raise finally: if subscription_id: diff --git a/sdks/python/hatchet_sdk/context/context.py b/sdks/python/hatchet_sdk/context/context.py index ea6952db95..af94f71062 100644 --- a/sdks/python/hatchet_sdk/context/context.py +++ b/sdks/python/hatchet_sdk/context/context.py @@ -535,8 +535,6 @@ async def aio_wait_for( if self.durable_event_listener is None: raise ValueError("Durable task client is not available") - from hatchet_sdk.contracts.v1.dispatcher_pb2 import DurableTaskEventKind - await self._ensure_stream_started() flat_conditions = flatten_conditions(list(conditions)) diff --git a/sdks/python/hatchet_sdk/contracts/v1/dispatcher_pb2.py b/sdks/python/hatchet_sdk/contracts/v1/dispatcher_pb2.py index 9132227729..323a64c6af 100644 --- a/sdks/python/hatchet_sdk/contracts/v1/dispatcher_pb2.py +++ b/sdks/python/hatchet_sdk/contracts/v1/dispatcher_pb2.py @@ -26,7 +26,7 @@ from hatchet_sdk.contracts.v1.shared import trigger_pb2 as v1_dot_shared_dot_trigger__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13v1/dispatcher.proto\x12\x02v1\x1a\x19v1/shared/condition.proto\x1a\x17v1/shared/trigger.proto\"5\n DurableTaskRequestRegisterWorker\x12\x11\n\tworker_id\x18\x01 \x01(\t\"6\n!DurableTaskResponseRegisterWorker\x12\x11\n\tworker_id\x18\x01 \x01(\t\"\xc5\x02\n\x17\x44urableTaskEventRequest\x12\x18\n\x10invocation_count\x18\x01 \x01(\x03\x12 \n\x18\x64urable_task_external_id\x18\x02 \x01(\t\x12&\n\x04kind\x18\x03 \x01(\x0e\x32\x18.v1.DurableTaskEventKind\x12\x14\n\x07payload\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x12\x44\n\x13wait_for_conditions\x18\x05 \x01(\x0b\x32\".v1.DurableEventListenerConditionsH\x01\x88\x01\x01\x12\x35\n\x0ctrigger_opts\x18\x06 \x01(\x0b\x32\x1a.v1.TriggerWorkflowRequestH\x02\x88\x01\x01\x42\n\n\x08_payloadB\x16\n\x14_wait_for_conditionsB\x0f\n\r_trigger_opts\"j\n\x1b\x44urableTaskEventAckResponse\x12\x18\n\x10invocation_count\x18\x01 \x01(\x03\x12 \n\x18\x64urable_task_external_id\x18\x02 \x01(\t\x12\x0f\n\x07node_id\x18\x03 \x01(\x03\"j\n$DurableTaskCallbackCompletedResponse\x12 \n\x18\x64urable_task_external_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\x03\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\"_\n!DurableTaskEvictInvocationRequest\x12\x18\n\x10invocation_count\x18\x01 \x01(\x03\x12 \n\x18\x64urable_task_external_id\x18\x02 \x01(\t\"O\n\x1a\x44urableTaskAwaitedCallback\x12 \n\x18\x64urable_task_external_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\x03\"\x92\x01\n\x1e\x44urableTaskWorkerStatusRequest\x12\x11\n\tworker_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\x03\x12\x11\n\tbranch_id\x18\x03 \x01(\x03\x12\x39\n\x11waiting_callbacks\x18\x04 \x03(\x0b\x32\x1e.v1.DurableTaskAwaitedCallback\"\x8e\x02\n\x12\x44urableTaskRequest\x12?\n\x0fregister_worker\x18\x01 \x01(\x0b\x32$.v1.DurableTaskRequestRegisterWorkerH\x00\x12,\n\x05\x65vent\x18\x02 \x01(\x0b\x32\x1b.v1.DurableTaskEventRequestH\x00\x12\x41\n\x10\x65vict_invocation\x18\x03 \x01(\x0b\x32%.v1.DurableTaskEvictInvocationRequestH\x00\x12;\n\rworker_status\x18\x04 \x01(\x0b\x32\".v1.DurableTaskWorkerStatusRequestH\x00\x42\t\n\x07message\"\xe2\x01\n\x13\x44urableTaskResponse\x12@\n\x0fregister_worker\x18\x01 \x01(\x0b\x32%.v1.DurableTaskResponseRegisterWorkerH\x00\x12\x36\n\x0btrigger_ack\x18\x02 \x01(\x0b\x32\x1f.v1.DurableTaskEventAckResponseH\x00\x12\x46\n\x12\x63\x61llback_completed\x18\x03 \x01(\x0b\x32(.v1.DurableTaskCallbackCompletedResponseH\x00\x42\t\n\x07message\"z\n\x1bRegisterDurableEventRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x12\n\nsignal_key\x18\x02 \x01(\t\x12\x36\n\nconditions\x18\x03 \x01(\x0b\x32\".v1.DurableEventListenerConditions\"\x1e\n\x1cRegisterDurableEventResponse\"C\n\x1cListenForDurableEventRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x12\n\nsignal_key\x18\x02 \x01(\t\"A\n\x0c\x44urableEvent\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x12\n\nsignal_key\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c*\xb0\x01\n\x14\x44urableTaskEventKind\x12)\n%DURABLE_TASK_TRIGGER_KIND_UNSPECIFIED\x10\x00\x12!\n\x1d\x44URABLE_TASK_TRIGGER_KIND_RUN\x10\x01\x12&\n\"DURABLE_TASK_TRIGGER_KIND_WAIT_FOR\x10\x02\x12\"\n\x1e\x44URABLE_TASK_TRIGGER_KIND_MEMO\x10\x03\x32\x84\x02\n\x0cV1Dispatcher\x12\x44\n\x0b\x44urableTask\x12\x16.v1.DurableTaskRequest\x1a\x17.v1.DurableTaskResponse\"\x00(\x01\x30\x01\x12[\n\x14RegisterDurableEvent\x12\x1f.v1.RegisterDurableEventRequest\x1a .v1.RegisterDurableEventResponse\"\x00\x12Q\n\x15ListenForDurableEvent\x12 .v1.ListenForDurableEventRequest\x1a\x10.v1.DurableEvent\"\x00(\x01\x30\x01\x42\x42Z@github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1b\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13v1/dispatcher.proto\x12\x02v1\x1a\x19v1/shared/condition.proto\x1a\x17v1/shared/trigger.proto\"5\n DurableTaskRequestRegisterWorker\x12\x11\n\tworker_id\x18\x01 \x01(\t\"6\n!DurableTaskResponseRegisterWorker\x12\x11\n\tworker_id\x18\x01 \x01(\t\"\xc5\x02\n\x17\x44urableTaskEventRequest\x12\x18\n\x10invocation_count\x18\x01 \x01(\x03\x12 \n\x18\x64urable_task_external_id\x18\x02 \x01(\t\x12&\n\x04kind\x18\x03 \x01(\x0e\x32\x18.v1.DurableTaskEventKind\x12\x14\n\x07payload\x18\x04 \x01(\x0cH\x00\x88\x01\x01\x12\x44\n\x13wait_for_conditions\x18\x05 \x01(\x0b\x32\".v1.DurableEventListenerConditionsH\x01\x88\x01\x01\x12\x35\n\x0ctrigger_opts\x18\x06 \x01(\x0b\x32\x1a.v1.TriggerWorkflowRequestH\x02\x88\x01\x01\x42\n\n\x08_payloadB\x16\n\x14_wait_for_conditionsB\x0f\n\r_trigger_opts\"j\n\x1b\x44urableTaskEventAckResponse\x12\x18\n\x10invocation_count\x18\x01 \x01(\x03\x12 \n\x18\x64urable_task_external_id\x18\x02 \x01(\t\x12\x0f\n\x07node_id\x18\x03 \x01(\x03\"j\n$DurableTaskCallbackCompletedResponse\x12 \n\x18\x64urable_task_external_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\x03\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\"_\n!DurableTaskEvictInvocationRequest\x12\x18\n\x10invocation_count\x18\x01 \x01(\x03\x12 \n\x18\x64urable_task_external_id\x18\x02 \x01(\t\"O\n\x1a\x44urableTaskAwaitedCallback\x12 \n\x18\x64urable_task_external_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\x03\"\x92\x01\n\x1e\x44urableTaskWorkerStatusRequest\x12\x11\n\tworker_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\x03\x12\x11\n\tbranch_id\x18\x03 \x01(\x03\x12\x39\n\x11waiting_callbacks\x18\x04 \x03(\x0b\x32\x1e.v1.DurableTaskAwaitedCallback\"\x8e\x02\n\x12\x44urableTaskRequest\x12?\n\x0fregister_worker\x18\x01 \x01(\x0b\x32$.v1.DurableTaskRequestRegisterWorkerH\x00\x12,\n\x05\x65vent\x18\x02 \x01(\x0b\x32\x1b.v1.DurableTaskEventRequestH\x00\x12\x41\n\x10\x65vict_invocation\x18\x03 \x01(\x0b\x32%.v1.DurableTaskEvictInvocationRequestH\x00\x12;\n\rworker_status\x18\x04 \x01(\x0b\x32\".v1.DurableTaskWorkerStatusRequestH\x00\x42\t\n\x07message\"~\n\x18\x44urableTaskErrorResponse\x12 \n\x18\x64urable_task_external_id\x18\x01 \x01(\t\x12\x18\n\x10invocation_count\x18\x02 \x01(\x03\x12\x0f\n\x07node_id\x18\x03 \x01(\x03\x12\x15\n\rerror_message\x18\x04 \x01(\t\"\x91\x02\n\x13\x44urableTaskResponse\x12@\n\x0fregister_worker\x18\x01 \x01(\x0b\x32%.v1.DurableTaskResponseRegisterWorkerH\x00\x12\x36\n\x0btrigger_ack\x18\x02 \x01(\x0b\x32\x1f.v1.DurableTaskEventAckResponseH\x00\x12\x46\n\x12\x63\x61llback_completed\x18\x03 \x01(\x0b\x32(.v1.DurableTaskCallbackCompletedResponseH\x00\x12-\n\x05\x65rror\x18\x04 \x01(\x0b\x32\x1c.v1.DurableTaskErrorResponseH\x00\x42\t\n\x07message\"z\n\x1bRegisterDurableEventRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x12\n\nsignal_key\x18\x02 \x01(\t\x12\x36\n\nconditions\x18\x03 \x01(\x0b\x32\".v1.DurableEventListenerConditions\"\x1e\n\x1cRegisterDurableEventResponse\"C\n\x1cListenForDurableEventRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x12\n\nsignal_key\x18\x02 \x01(\t\"A\n\x0c\x44urableEvent\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x12\n\nsignal_key\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c*\xb0\x01\n\x14\x44urableTaskEventKind\x12)\n%DURABLE_TASK_TRIGGER_KIND_UNSPECIFIED\x10\x00\x12!\n\x1d\x44URABLE_TASK_TRIGGER_KIND_RUN\x10\x01\x12&\n\"DURABLE_TASK_TRIGGER_KIND_WAIT_FOR\x10\x02\x12\"\n\x1e\x44URABLE_TASK_TRIGGER_KIND_MEMO\x10\x03\x32\x84\x02\n\x0cV1Dispatcher\x12\x44\n\x0b\x44urableTask\x12\x16.v1.DurableTaskRequest\x1a\x17.v1.DurableTaskResponse\"\x00(\x01\x30\x01\x12[\n\x14RegisterDurableEvent\x12\x1f.v1.RegisterDurableEventRequest\x1a .v1.RegisterDurableEventResponse\"\x00\x12Q\n\x15ListenForDurableEvent\x12 .v1.ListenForDurableEventRequest\x1a\x10.v1.DurableEvent\"\x00(\x01\x30\x01\x42\x42Z@github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1b\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -34,8 +34,8 @@ if not _descriptor._USE_C_DESCRIPTORS: _globals['DESCRIPTOR']._loaded_options = None _globals['DESCRIPTOR']._serialized_options = b'Z@github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1' - _globals['_DURABLETASKEVENTKIND']._serialized_start=1856 - _globals['_DURABLETASKEVENTKIND']._serialized_end=2032 + _globals['_DURABLETASKEVENTKIND']._serialized_start=2031 + _globals['_DURABLETASKEVENTKIND']._serialized_end=2207 _globals['_DURABLETASKREQUESTREGISTERWORKER']._serialized_start=79 _globals['_DURABLETASKREQUESTREGISTERWORKER']._serialized_end=132 _globals['_DURABLETASKRESPONSEREGISTERWORKER']._serialized_start=134 @@ -54,16 +54,18 @@ _globals['_DURABLETASKWORKERSTATUSREQUEST']._serialized_end=1059 _globals['_DURABLETASKREQUEST']._serialized_start=1062 _globals['_DURABLETASKREQUEST']._serialized_end=1332 - _globals['_DURABLETASKRESPONSE']._serialized_start=1335 - _globals['_DURABLETASKRESPONSE']._serialized_end=1561 - _globals['_REGISTERDURABLEEVENTREQUEST']._serialized_start=1563 - _globals['_REGISTERDURABLEEVENTREQUEST']._serialized_end=1685 - _globals['_REGISTERDURABLEEVENTRESPONSE']._serialized_start=1687 - _globals['_REGISTERDURABLEEVENTRESPONSE']._serialized_end=1717 - _globals['_LISTENFORDURABLEEVENTREQUEST']._serialized_start=1719 - _globals['_LISTENFORDURABLEEVENTREQUEST']._serialized_end=1786 - _globals['_DURABLEEVENT']._serialized_start=1788 - _globals['_DURABLEEVENT']._serialized_end=1853 - _globals['_V1DISPATCHER']._serialized_start=2035 - _globals['_V1DISPATCHER']._serialized_end=2295 + _globals['_DURABLETASKERRORRESPONSE']._serialized_start=1334 + _globals['_DURABLETASKERRORRESPONSE']._serialized_end=1460 + _globals['_DURABLETASKRESPONSE']._serialized_start=1463 + _globals['_DURABLETASKRESPONSE']._serialized_end=1736 + _globals['_REGISTERDURABLEEVENTREQUEST']._serialized_start=1738 + _globals['_REGISTERDURABLEEVENTREQUEST']._serialized_end=1860 + _globals['_REGISTERDURABLEEVENTRESPONSE']._serialized_start=1862 + _globals['_REGISTERDURABLEEVENTRESPONSE']._serialized_end=1892 + _globals['_LISTENFORDURABLEEVENTREQUEST']._serialized_start=1894 + _globals['_LISTENFORDURABLEEVENTREQUEST']._serialized_end=1961 + _globals['_DURABLEEVENT']._serialized_start=1963 + _globals['_DURABLEEVENT']._serialized_end=2028 + _globals['_V1DISPATCHER']._serialized_start=2210 + _globals['_V1DISPATCHER']._serialized_end=2470 # @@protoc_insertion_point(module_scope) diff --git a/sdks/python/hatchet_sdk/contracts/v1/dispatcher_pb2.pyi b/sdks/python/hatchet_sdk/contracts/v1/dispatcher_pb2.pyi index 2c1ad677c5..9f76fc5973 100644 --- a/sdks/python/hatchet_sdk/contracts/v1/dispatcher_pb2.pyi +++ b/sdks/python/hatchet_sdk/contracts/v1/dispatcher_pb2.pyi @@ -108,15 +108,29 @@ class DurableTaskRequest(_message.Message): worker_status: DurableTaskWorkerStatusRequest def __init__(self, register_worker: _Optional[_Union[DurableTaskRequestRegisterWorker, _Mapping]] = ..., event: _Optional[_Union[DurableTaskEventRequest, _Mapping]] = ..., evict_invocation: _Optional[_Union[DurableTaskEvictInvocationRequest, _Mapping]] = ..., worker_status: _Optional[_Union[DurableTaskWorkerStatusRequest, _Mapping]] = ...) -> None: ... +class DurableTaskErrorResponse(_message.Message): + __slots__ = ("durable_task_external_id", "invocation_count", "node_id", "error_message") + DURABLE_TASK_EXTERNAL_ID_FIELD_NUMBER: _ClassVar[int] + INVOCATION_COUNT_FIELD_NUMBER: _ClassVar[int] + NODE_ID_FIELD_NUMBER: _ClassVar[int] + ERROR_MESSAGE_FIELD_NUMBER: _ClassVar[int] + durable_task_external_id: str + invocation_count: int + node_id: int + error_message: str + def __init__(self, durable_task_external_id: _Optional[str] = ..., invocation_count: _Optional[int] = ..., node_id: _Optional[int] = ..., error_message: _Optional[str] = ...) -> None: ... + class DurableTaskResponse(_message.Message): - __slots__ = ("register_worker", "trigger_ack", "callback_completed") + __slots__ = ("register_worker", "trigger_ack", "callback_completed", "error") REGISTER_WORKER_FIELD_NUMBER: _ClassVar[int] TRIGGER_ACK_FIELD_NUMBER: _ClassVar[int] CALLBACK_COMPLETED_FIELD_NUMBER: _ClassVar[int] + ERROR_FIELD_NUMBER: _ClassVar[int] register_worker: DurableTaskResponseRegisterWorker trigger_ack: DurableTaskEventAckResponse callback_completed: DurableTaskCallbackCompletedResponse - def __init__(self, register_worker: _Optional[_Union[DurableTaskResponseRegisterWorker, _Mapping]] = ..., trigger_ack: _Optional[_Union[DurableTaskEventAckResponse, _Mapping]] = ..., callback_completed: _Optional[_Union[DurableTaskCallbackCompletedResponse, _Mapping]] = ...) -> None: ... + error: DurableTaskErrorResponse + def __init__(self, register_worker: _Optional[_Union[DurableTaskResponseRegisterWorker, _Mapping]] = ..., trigger_ack: _Optional[_Union[DurableTaskEventAckResponse, _Mapping]] = ..., callback_completed: _Optional[_Union[DurableTaskCallbackCompletedResponse, _Mapping]] = ..., error: _Optional[_Union[DurableTaskErrorResponse, _Mapping]] = ...) -> None: ... class RegisterDurableEventRequest(_message.Message): __slots__ = ("task_id", "signal_key", "conditions") diff --git a/sdks/python/hatchet_sdk/runnables/workflow.py b/sdks/python/hatchet_sdk/runnables/workflow.py index cda9c4978c..2fc8d28ec9 100644 --- a/sdks/python/hatchet_sdk/runnables/workflow.py +++ b/sdks/python/hatchet_sdk/runnables/workflow.py @@ -41,7 +41,6 @@ from hatchet_sdk.contracts.workflows.workflows_pb2 import WorkflowVersion from hatchet_sdk.exceptions import CancellationReason, CancelledError from hatchet_sdk.labels import DesiredWorkerLabel -from hatchet_sdk.logger import logger from hatchet_sdk.rate_limit import RateLimit from hatchet_sdk.runnables.contextvars import ( ctx_cancellation_token, @@ -686,11 +685,6 @@ def run_no_wait( """ cancellation_token = self._resolve_check_cancellation_token() - logger.debug( - f"Workflow.run_no_wait: triggering {self.config.name}, " - f"token={cancellation_token is not None}" - ) - ref = self.client._client.admin.run_workflow( workflow_name=self.config.name, input=self._serialize_input(input), @@ -722,11 +716,6 @@ def run( """ cancellation_token = self._resolve_check_cancellation_token() - logger.debug( - f"Workflow.run: triggering {self.config.name}, " - f"token={cancellation_token is not None}" - ) - ref = self.client._client.admin.run_workflow( workflow_name=self.config.name, input=self._serialize_input(input), @@ -738,8 +727,6 @@ def run( ref.workflow_run_id, ) - logger.debug(f"Workflow.run: awaiting result for {ref.workflow_run_id}") - return ref.result(cancellation_token=cancellation_token) async def aio_run_no_wait( @@ -761,11 +748,6 @@ async def aio_run_no_wait( """ cancellation_token = self._resolve_check_cancellation_token() - logger.debug( - f"Workflow.aio_run_no_wait: triggering {self.config.name}, " - f"token={cancellation_token is not None}" - ) - ref = await self.client._client.admin.aio_run_workflow( workflow_name=self.config.name, input=self._serialize_input(input), @@ -801,11 +783,6 @@ async def aio_run( cancellation_token = self._resolve_check_cancellation_token() - logger.debug( - f"Workflow.aio_run: triggering {self.config.name}, " - f"token={cancellation_token is not None}" - ) - ref = await self.client._client.admin.aio_run_workflow( workflow_name=self.config.name, input=self._serialize_input(input), @@ -817,8 +794,6 @@ async def aio_run( ref.workflow_run_id, ) - logger.debug(f"Workflow.aio_run: awaiting result for {ref.workflow_run_id}") - return await await_with_cancellation( ref.aio_result(), cancellation_token, @@ -881,17 +856,11 @@ def run_many( refs, ) - # Pass cancellation_token through to each result() call - # The cancellation check happens INSIDE result()'s polling loop results: list[dict[str, Any] | BaseException] = [] for ref in refs: try: results.append(ref.result(cancellation_token=cancellation_token)) except CancelledError: # noqa: PERF203 - logger.debug( - f"Workflow.run_many: cancellation detected, stopping wait, " - f"reason={CancellationReason.PARENT_CANCELLED.value}" - ) if return_exceptions: results.append( CancelledError( @@ -940,11 +909,6 @@ async def aio_run_many( """ cancellation_token = self._resolve_check_cancellation_token() - logger.debug( - f"Workflow.aio_run_many: triggering {len(workflows)} workflows, " - f"token={cancellation_token is not None}" - ) - refs = await self.client._client.admin.aio_run_workflows( workflows=workflows, ) @@ -978,11 +942,6 @@ def run_many_no_wait( """ cancellation_token = self._resolve_check_cancellation_token() - logger.debug( - f"Workflow.run_many_no_wait: triggering {len(workflows)} workflows, " - f"token={cancellation_token is not None}" - ) - refs = self.client._client.admin.run_workflows( workflows=workflows, ) @@ -1012,11 +971,6 @@ async def aio_run_many_no_wait( """ cancellation_token = self._resolve_check_cancellation_token() - logger.debug( - f"Workflow.aio_run_many_no_wait: triggering {len(workflows)} workflows, " - f"token={cancellation_token is not None}" - ) - refs = await self.client._client.admin.aio_run_workflows( workflows=workflows, ) diff --git a/sdks/python/hatchet_sdk/utils/cancellation.py b/sdks/python/hatchet_sdk/utils/cancellation.py index eefe7a4d6d..00b82705f7 100644 --- a/sdks/python/hatchet_sdk/utils/cancellation.py +++ b/sdks/python/hatchet_sdk/utils/cancellation.py @@ -7,8 +7,6 @@ from collections.abc import Awaitable, Callable from typing import TYPE_CHECKING, TypeVar -from hatchet_sdk.logger import logger - if TYPE_CHECKING: from hatchet_sdk.cancellation import CancellationToken @@ -120,30 +118,21 @@ async def long_running_task(): """ if token is None: - logger.debug("await_with_cancellation: no token provided, awaiting directly") return await coro - logger.debug("await_with_cancellation: starting with cancellation token") - - # Check if already cancelled if token.is_cancelled: - logger.debug("await_with_cancellation: token already cancelled") if cancel_callback: - logger.debug("await_with_cancellation: invoking cancel callback") await _invoke_cancel_callback(cancel_callback) + raise asyncio.CancelledError("Operation cancelled by cancellation token") main_task = asyncio.ensure_future(coro) try: - result = await race_against_token(main_task, token) - logger.debug("await_with_cancellation: completed successfully") - return result - + return await race_against_token(main_task, token) except asyncio.CancelledError: - logger.debug("await_with_cancellation: cancelled") if cancel_callback: - logger.debug("await_with_cancellation: invoking cancel callback") with contextlib.suppress(asyncio.CancelledError): await asyncio.shield(_invoke_cancel_callback(cancel_callback)) + raise diff --git a/sdks/python/hatchet_sdk/worker/runner/runner.py b/sdks/python/hatchet_sdk/worker/runner/runner.py index 08d3309173..0f1d5e79ec 100644 --- a/sdks/python/hatchet_sdk/worker/runner/runner.py +++ b/sdks/python/hatchet_sdk/worker/runner/runner.py @@ -509,26 +509,18 @@ async def handle_cancel_action(self, action: Action) -> None: start_time = time.monotonic() logger.info( - f"Cancellation: received cancel action for {action.action_id}, " + f"received cancel action for {action.action_id}, " f"reason={CancellationReason.WORKFLOW_CANCELLED.value}" ) try: - # Trigger the cancellation token to signal the context to stop if key in self.contexts: ctx = self.contexts[key] - child_count = len(ctx.cancellation_token.child_run_ids) - logger.debug( - f"Cancellation: triggering token for {action.action_id}, " - f"reason={CancellationReason.WORKFLOW_CANCELLED.value}, " - f"{child_count} children registered" - ) + ctx._set_cancellation_flag(CancellationReason.WORKFLOW_CANCELLED) self.cancellations[key] = True # Note: Child workflows are not cancelled here - they run independently # and are managed by Hatchet's normal cancellation mechanisms - else: - logger.debug(f"Cancellation: no context found for {action.action_id}") # Wait with supervision (using timedelta configs) grace_period = self.config.cancellation_grace_period.total_seconds() @@ -548,7 +540,7 @@ async def handle_cancel_action(self, action: Action) -> None: if task_still_running: logger.warning( - f"Cancellation: task {action.action_id} has not cancelled after " + f"task {action.action_id} has not cancelled after " f"{elapsed_ms}ms (warning threshold {warning_threshold_ms}ms). " f"Consider checking for blocking operations. " f"See https://docs.hatchet.run/home/cancellation" @@ -559,25 +551,18 @@ async def handle_cancel_action(self, action: Action) -> None: await asyncio.sleep(remaining) if key in self.tasks and not self.tasks[key].done(): - logger.debug( - f"Cancellation: force-cancelling task {action.action_id} " - f"after grace period ({grace_period_ms}ms)" - ) self.tasks[key].cancel() if key in self.threads: thread = self.threads[key] if self.config.enable_force_kill_sync_threads: - logger.debug( - f"Cancellation: force-killing thread for {action.action_id}" - ) self.force_kill_thread(thread) await asyncio.sleep(1) if thread.is_alive(): logger.warning( - f"Cancellation: thread {thread.ident} with key {key} is still running " + f"thread {thread.ident} with key {key} is still running " f"after cancellation. This could cause the thread pool to get blocked " f"and prevent new tasks from running." ) @@ -586,15 +571,9 @@ async def handle_cancel_action(self, action: Action) -> None: total_elapsed_ms = round(total_elapsed * 1000) if total_elapsed > grace_period: logger.warning( - f"Cancellation: cancellation of {action.action_id} took {total_elapsed_ms}ms " + f"cancellation of {action.action_id} took {total_elapsed_ms}ms " f"(exceeded grace period of {grace_period_ms}ms)" ) - else: - logger.debug( - f"Cancellation: task {action.action_id} eventually completed in {total_elapsed_ms}ms" - ) - else: - logger.info(f"Cancellation: task {action.action_id} completed") finally: self.cleanup_run_id(key) @@ -633,16 +612,14 @@ def serialize_output(self, output: Any) -> str | None: ) from e if "\\u0000" in serialized_output: - raise IllegalTaskOutputError( - dedent(f""" + raise IllegalTaskOutputError(dedent(f""" Task outputs cannot contain the unicode null character \\u0000 Please see this Discord thread: https://discord.com/channels/1088927970518909068/1384324576166678710/1386714014565928992 Relevant Postgres documentation: https://www.postgresql.org/docs/current/datatype-json.html Use `hatchet_sdk.{remove_null_unicode_character.__name__}` to sanitize your output if you'd like to remove the character. - """) - ) + """)) return serialized_output diff --git a/sdks/python/hatchet_sdk/workflow_run.py b/sdks/python/hatchet_sdk/workflow_run.py index e1d0c3cc25..6bc6ace5bc 100644 --- a/sdks/python/hatchet_sdk/workflow_run.py +++ b/sdks/python/hatchet_sdk/workflow_run.py @@ -14,7 +14,6 @@ FailedTaskRunExceptionGroup, TaskRunError, ) -from hatchet_sdk.logger import logger from hatchet_sdk.utils.cancellation import await_with_cancellation if TYPE_CHECKING: @@ -50,11 +49,6 @@ async def aio_result( :param cancellation_token: Optional cancellation token to abort the wait. :return: A dictionary mapping task names to their outputs. """ - logger.debug( - f"WorkflowRunRef.aio_result: waiting for {self.workflow_run_id}, " - f"token={cancellation_token is not None}" - ) - if cancellation_token: return await await_with_cancellation( self.workflow_run_listener.aio_result(self.workflow_run_id), @@ -88,20 +82,11 @@ def result( """ from hatchet_sdk.clients.admin import RunStatus - logger.debug( - f"WorkflowRunRef.result: waiting for {self.workflow_run_id}, " - f"token={cancellation_token is not None}" - ) - retries = 0 while True: # Check cancellation at start of each iteration if cancellation_token and cancellation_token.is_cancelled: - logger.debug( - f"WorkflowRunRef.result: cancellation detected for {self.workflow_run_id}, " - f"reason={CancellationReason.PARENT_CANCELLED.value}" - ) raise CancelledError( "Operation cancelled by cancellation token", reason=CancellationReason.PARENT_CANCELLED, @@ -120,10 +105,6 @@ def result( # Use interruptible sleep via token.wait() if cancellation_token: if cancellation_token.wait(timeout=1.0): - logger.debug( - f"WorkflowRunRef.result: cancellation during retry sleep for {self.workflow_run_id}, " - f"reason={CancellationReason.PARENT_CANCELLED.value}" - ) raise CancelledError( "Operation cancelled by cancellation token", reason=CancellationReason.PARENT_CANCELLED, @@ -132,10 +113,6 @@ def result( time.sleep(1) continue - logger.debug( - f"WorkflowRunRef.result: {self.workflow_run_id} status={details.status}" - ) - if ( details.status in [RunStatus.QUEUED, RunStatus.RUNNING] or details.done is False @@ -143,10 +120,6 @@ def result( # Use interruptible sleep via token.wait() if cancellation_token: if cancellation_token.wait(timeout=1.0): - logger.debug( - f"WorkflowRunRef.result: cancellation during poll sleep for {self.workflow_run_id}, " - f"reason={CancellationReason.PARENT_CANCELLED.value}" - ) raise CancelledError( "Operation cancelled by cancellation token", reason=CancellationReason.PARENT_CANCELLED, @@ -166,9 +139,6 @@ def result( ) if details.status == RunStatus.COMPLETED: - logger.debug( - f"WorkflowRunRef.result: {self.workflow_run_id} completed successfully" - ) return { readable_id: run.output for readable_id, run in details.task_runs.items() diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index fbce316496..2ac06f592c 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -2318,14 +2318,9 @@ CREATE TABLE v1_durable_event_log_entry ( parent_node_id BIGINT, -- The branch id when this event was first seen. A durable event log can be a part of many branches. branch_id BIGINT NOT NULL, - -- Todo: Associated data for this event should be stored in the v1_payload table! - -- data JSONB, - -- The hash of the data stored in the v1_payload table to check non-determinism violations. - -- This can be null for event types that don't have associated data. - -- TODO: we can add CHECK CONSTRAINT for event types that require data_hash to be non-null. - data_hash BYTEA, - -- Can discuss: adds some flexibility for future hash algorithms - data_hash_alg TEXT, + -- An idempotency key generated from the incoming data (using the type of event + wait for conditions or the trigger event payload + options) + -- to determine whether or not there's been a non-determinism error + idempotency_key BYTEA, -- Access patterns: -- Definite: we'll query directly for the node_id when a durable task is replaying its log -- Possible: we may want to query a range of node_ids for a durable task