Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api-contracts/v1/workflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ message TriggerWorkflowRunRequest {
bytes input = 2;
bytes additional_metadata = 3;
optional int32 priority = 4;
// (optional) idempotency key for deduplicating workflow runs
optional string idempotency_key = 5;
}

message TriggerWorkflowRunResponse {
Expand Down
3 changes: 3 additions & 0 deletions api-contracts/workflows/workflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ message TriggerWorkflowRequest {

// (optional) override for the priority of the workflow tasks, will set all tasks to this priority
optional int32 priority = 9;

// (optional) idempotency key for deduplicating workflow runs
optional string idempotency_key = 10;
}

message TriggerWorkflowResponse {
Expand Down
11 changes: 11 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20260205120000_v1_0_76.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE v1_idempotency_key
ADD COLUMN last_denied_at TIMESTAMPTZ;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE v1_idempotency_key
DROP COLUMN last_denied_at;
-- +goose StatementEnd
78 changes: 78 additions & 0 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
description = "Hatchet development shell";

inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11";
nixpkgs-unstable.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
};

outputs = { self, nixpkgs, nixpkgs-unstable, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = import nixpkgs { inherit system; };
pkgsUnstable = import nixpkgs-unstable { inherit system; };
go = pkgs.go;
in
{
devShells.default = pkgs.mkShell {
packages = [
go
pkgs.gopls
pkgs.gotools
pkgsUnstable.golangci-lint
pkgs.go-task
pkgs.protobuf
pkgs.protoc-gen-go
pkgs.protoc-gen-go-grpc
pkgs.nodejs_20
pkgs.pnpm
pkgs.python312
pkgs.poetry
pkgs.gcc
pkgs.pkg-config
pkgs.git
];
};
});
}
169 changes: 91 additions & 78 deletions internal/services/admin/contracts/workflows.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion internal/services/admin/contracts/workflows_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 21 additions & 4 deletions internal/services/admin/server_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,16 @@ func (i *AdminServiceImpl) newTriggerOpt(
t.ChildKey = req.ChildKey
}

return &v1.WorkflowNameTriggerOpts{
opts := &v1.WorkflowNameTriggerOpts{
TriggerTaskData: t,
}, nil
}

if req.IdempotencyKey != nil {
key := v1.IdempotencyKey(*req.IdempotencyKey)
opts.IdempotencyKey = &key
}

return opts, nil
}

func (i *AdminServiceImpl) generateExternalIds(ctx context.Context, tenantId uuid.UUID, opts []*v1.WorkflowNameTriggerOpts) error {
Expand Down Expand Up @@ -274,6 +281,10 @@ func (i *AdminServiceImpl) ingest(ctx context.Context, tenantId uuid.UUID, opts

// if we have a scheduling error, we'll fall back to normal ingestion
if schedulingErr != nil {
if errors.Is(schedulingErr, v1.ErrIdempotencyKeyAlreadyClaimed) {
return status.Error(codes.AlreadyExists, schedulingErr.Error())
}

if !errors.Is(schedulingErr, schedulingv1.ErrTenantNotFound) && !errors.Is(schedulingErr, schedulingv1.ErrNoOptimisticSlots) {
i.l.Error().Err(schedulingErr).Msg("could not run optimistic scheduling")
}
Expand Down Expand Up @@ -313,8 +324,14 @@ func (i *AdminServiceImpl) ingest(ctx context.Context, tenantId uuid.UUID, opts
triggerErr := i.tw.TriggerFromWorkflowNames(ctx, tenantId, optsToSend)

// if we fail to trigger via gRPC, we fall back to normal ingestion
if triggerErr != nil && !errors.Is(triggerErr, trigger.ErrNoTriggerSlots) {
i.l.Error().Err(triggerErr).Msg("could not trigger workflow runs via gRPC")
if triggerErr != nil {
if errors.Is(triggerErr, v1.ErrIdempotencyKeyAlreadyClaimed) {
return status.Error(codes.AlreadyExists, triggerErr.Error())
}

if !errors.Is(triggerErr, trigger.ErrNoTriggerSlots) {
i.l.Error().Err(triggerErr).Msg("could not trigger workflow runs via gRPC")
}
} else if triggerErr == nil {
return nil
}
Expand Down
25 changes: 21 additions & 4 deletions internal/services/admin/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,16 @@ func (i *AdminServiceImpl) newTriggerOpt(
t.Priority = req.Priority
}

return &v1.WorkflowNameTriggerOpts{
opts := &v1.WorkflowNameTriggerOpts{
TriggerTaskData: t,
}, nil
}

if req.IdempotencyKey != nil {
key := v1.IdempotencyKey(*req.IdempotencyKey)
opts.IdempotencyKey = &key
}

return opts, nil
}

func (i *AdminServiceImpl) generateExternalIds(ctx context.Context, tenantId uuid.UUID, opts []*v1.WorkflowNameTriggerOpts) error {
Expand Down Expand Up @@ -539,6 +546,10 @@ func (i *AdminServiceImpl) ingest(ctx context.Context, tenantId uuid.UUID, opts

// if we have a scheduling error, we'll fall back to normal ingestion
if schedulingErr != nil {
if errors.Is(schedulingErr, v1.ErrIdempotencyKeyAlreadyClaimed) {
return status.Error(codes.AlreadyExists, schedulingErr.Error())
}

if !errors.Is(schedulingErr, schedulingv1.ErrTenantNotFound) && !errors.Is(schedulingErr, schedulingv1.ErrNoOptimisticSlots) {
i.l.Error().Err(schedulingErr).Msg("could not run optimistic scheduling")
}
Expand Down Expand Up @@ -578,8 +589,14 @@ func (i *AdminServiceImpl) ingest(ctx context.Context, tenantId uuid.UUID, opts
triggerErr := i.tw.TriggerFromWorkflowNames(ctx, tenantId, optsToSend)

// if we fail to trigger via gRPC, we fall back to normal ingestion
if triggerErr != nil && !errors.Is(triggerErr, trigger.ErrNoTriggerSlots) {
i.l.Error().Err(triggerErr).Msg("could not trigger workflow runs via gRPC")
if triggerErr != nil {
if errors.Is(triggerErr, v1.ErrIdempotencyKeyAlreadyClaimed) {
return status.Error(codes.AlreadyExists, triggerErr.Error())
}

if !errors.Is(triggerErr, trigger.ErrNoTriggerSlots) {
i.l.Error().Err(triggerErr).Msg("could not trigger workflow runs via gRPC")
}
} else if triggerErr == nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ func (o *OLAPControllerImpl) notifyDAGsUpdated(ctx context.Context, rows []v1.Up
tenantIdToPayloads := make(map[uuid.UUID][]tasktypes.NotifyFinalizedPayload)

for _, row := range rows {
if row.ReadableStatus == sqlcv1.V1ReadableStatusOlapCOMPLETED ||
row.ReadableStatus == sqlcv1.V1ReadableStatusOlapCANCELLED ||
row.ReadableStatus == sqlcv1.V1ReadableStatusOlapFAILED {
if err := o.repo.Idempotency().DeleteIdempotencyKeysByExternalId(ctx, row.TenantId, row.ExternalId); err != nil {
o.l.Error().Err(err).Str("dagExternalId", row.ExternalId.String()).Msg("failed to delete idempotency key for dag")
}
}

tenantIdToPayloads[row.TenantId] = append(tenantIdToPayloads[row.TenantId], tasktypes.NotifyFinalizedPayload{
ExternalId: row.ExternalId,
Status: row.ReadableStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (o *OLAPControllerImpl) notifyTasksUpdated(ctx context.Context, rows []v1.U
continue
}

if !row.IsDAGTask {
if err := o.repo.Idempotency().DeleteIdempotencyKeysByExternalId(ctx, row.TenantId, row.ExternalId); err != nil {
o.l.Error().Err(err).Str("taskExternalId", row.ExternalId.String()).Msg("failed to delete idempotency key for task")
}
}

tenantIdToPayloads[row.TenantId] = append(tenantIdToPayloads[row.TenantId], tasktypes.NotifyFinalizedPayload{
ExternalId: row.ExternalId,
Status: row.ReadableStatus,
Expand Down
9 changes: 8 additions & 1 deletion internal/services/controllers/task/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,14 @@ func (tc *TasksControllerImpl) handleProcessInternalEvents(ctx context.Context,

// handleProcessEventTrigger is responsible for inserting tasks into the database based on event triggers.
func (tc *TasksControllerImpl) handleProcessTaskTrigger(ctx context.Context, tenantId uuid.UUID, payloads [][]byte) error {
return tc.tw.TriggerFromWorkflowNames(ctx, tenantId, msgqueue.JSONConvert[v1.WorkflowNameTriggerOpts](payloads))
err := tc.tw.TriggerFromWorkflowNames(ctx, tenantId, msgqueue.JSONConvert[v1.WorkflowNameTriggerOpts](payloads))

if errors.Is(err, v1.ErrIdempotencyKeyAlreadyClaimed) {
tc.l.Debug().Err(err).Msg("skipping workflow trigger with duplicate idempotency key")
return nil
}

return err
}

// processUserEventMatches looks for user event matches
Expand Down
Loading