Skip to content

Commit 2fdc47a

Browse files
gruttmrkaye97
andauthored
feat: multiple slot types (#2927)
* feat: adds support for multiple slot types, primarily motivated by durable slots --------- Co-authored-by: mrkaye97 <mrkaye97@gmail.com>
1 parent eefbcdc commit 2fdc47a

File tree

191 files changed

+10296
-2411
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

191 files changed

+10296
-2411
lines changed

.github/workflows/release.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ on:
99
name: Release
1010
jobs:
1111
load:
12-
runs-on: ubicloud-standard-4
12+
runs-on: ubicloud-standard-8
1313
timeout-minutes: 30
1414
strategy:
1515
matrix:

.github/workflows/test.yml

Lines changed: 211 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ jobs:
5454
run: docker compose down
5555

5656
unit:
57-
runs-on: ubicloud-standard-4
57+
runs-on: ubicloud-standard-8
5858
steps:
5959
- uses: actions/checkout@v6
6060
- name: Setup Go
@@ -72,7 +72,7 @@ jobs:
7272
run: go test $(go list ./... | grep -v "quickstart") -v -failfast
7373

7474
integration:
75-
runs-on: ubicloud-standard-4
75+
runs-on: ubicloud-standard-8
7676
env:
7777
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet?sslmode=disable
7878

@@ -117,7 +117,7 @@ jobs:
117117
run: docker compose down
118118

119119
e2e:
120-
runs-on: ubicloud-standard-4
120+
runs-on: ubicloud-standard-8
121121
timeout-minutes: 30
122122
env:
123123
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet?sslmode=disable
@@ -206,7 +206,7 @@ jobs:
206206
run: docker compose down
207207

208208
e2e-pgmq:
209-
runs-on: ubicloud-standard-4
209+
runs-on: ubicloud-standard-8
210210
timeout-minutes: 30
211211
env:
212212
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet?sslmode=disable
@@ -297,7 +297,7 @@ jobs:
297297
run: docker compose down
298298

299299
load:
300-
runs-on: ubicloud-standard-4
300+
runs-on: ubicloud-standard-8
301301
timeout-minutes: 30
302302
strategy:
303303
matrix:
@@ -339,8 +339,213 @@ jobs:
339339
TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }}
340340
TESTING_MATRIX_OPTIMISTIC_SCHEDULING: ${{ matrix.optimistic-scheduling }}
341341

342+
load-online-migrate:
343+
runs-on: ubicloud-standard-8
344+
timeout-minutes: 30
345+
env:
346+
DATABASE_URL: postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet?sslmode=disable
347+
348+
steps:
349+
- uses: actions/checkout@v6
350+
with:
351+
fetch-depth: 0
352+
fetch-tags: true
353+
354+
- name: Setup Go
355+
uses: actions/setup-go@v6
356+
with:
357+
go-version: "1.25"
358+
359+
- name: Compose
360+
run: docker compose up -d
361+
362+
- name: Determine latest stable release tag
363+
run: |
364+
LATEST_TAG=$(git tag --sort=-v:refname | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+$' | head -1)
365+
if [ -z "$LATEST_TAG" ]; then
366+
echo "ERROR: No stable release tag found"
367+
exit 1
368+
fi
369+
echo "Latest stable tag: $LATEST_TAG"
370+
echo "LATEST_TAG=$LATEST_TAG" >> $GITHUB_ENV
371+
372+
- name: Pull old release images
373+
run: |
374+
docker pull ghcr.io/hatchet-dev/hatchet/hatchet-migrate:${{ env.LATEST_TAG }}
375+
docker pull ghcr.io/hatchet-dev/hatchet/hatchet-admin:${{ env.LATEST_TAG }}
376+
docker pull ghcr.io/hatchet-dev/hatchet/hatchet-engine:${{ env.LATEST_TAG }}
377+
docker pull ghcr.io/hatchet-dev/hatchet/hatchet-loadtest:${{ env.LATEST_TAG }}
378+
379+
- name: Run old migrations
380+
run: |
381+
docker run --rm --network host \
382+
-e DATABASE_URL="${{ env.DATABASE_URL }}" \
383+
ghcr.io/hatchet-dev/hatchet/hatchet-migrate:${{ env.LATEST_TAG }}
384+
385+
- name: Setup config and seed database
386+
run: |
387+
mkdir -p generated
388+
docker run --rm --network host \
389+
-v ${{ github.workspace }}/generated:/hatchet/generated \
390+
-e DATABASE_URL="${{ env.DATABASE_URL }}" \
391+
-e SERVER_GRPC_PORT=7077 \
392+
-e SERVER_GRPC_BROADCAST_ADDRESS=localhost:7077 \
393+
-e SERVER_GRPC_INSECURE=true \
394+
-e SERVER_AUTH_COOKIE_DOMAIN=localhost \
395+
-e SERVER_AUTH_COOKIE_INSECURE=true \
396+
ghcr.io/hatchet-dev/hatchet/hatchet-admin:${{ env.LATEST_TAG }} \
397+
/hatchet/hatchet-admin quickstart --skip certs --generated-config-dir /hatchet/generated
398+
399+
- name: Generate API token
400+
run: |
401+
TOKEN=$(docker run --rm --network host \
402+
-v ${{ github.workspace }}/generated:/hatchet/generated \
403+
-e DATABASE_URL="${{ env.DATABASE_URL }}" \
404+
-e SERVER_GRPC_PORT=7077 \
405+
-e SERVER_GRPC_BROADCAST_ADDRESS=localhost:7077 \
406+
-e SERVER_GRPC_INSECURE=true \
407+
-e SERVER_AUTH_COOKIE_DOMAIN=localhost \
408+
-e SERVER_AUTH_COOKIE_INSECURE=true \
409+
ghcr.io/hatchet-dev/hatchet/hatchet-admin:${{ env.LATEST_TAG }} \
410+
/hatchet/hatchet-admin token create --config /hatchet/generated)
411+
echo "HATCHET_CLIENT_TOKEN=$TOKEN" >> $GITHUB_ENV
412+
413+
- name: Start old engine
414+
run: |
415+
docker run -d --name hatchet-engine --network host \
416+
-v ${{ github.workspace }}/generated:/hatchet/generated \
417+
-e DATABASE_URL="${{ env.DATABASE_URL }}" \
418+
-e SERVER_GRPC_PORT=7077 \
419+
-e SERVER_GRPC_BROADCAST_ADDRESS=localhost:7077 \
420+
-e SERVER_GRPC_INSECURE=true \
421+
-e SERVER_AUTH_COOKIE_DOMAIN=localhost \
422+
-e SERVER_AUTH_COOKIE_INSECURE=true \
423+
-e SERVER_MSGQUEUE_KIND=postgres \
424+
-e SERVER_LOGGER_LEVEL=warn \
425+
-e SERVER_LOGGER_FORMAT=console \
426+
-e DATABASE_LOGGER_LEVEL=warn \
427+
-e DATABASE_LOGGER_FORMAT=console \
428+
ghcr.io/hatchet-dev/hatchet/hatchet-engine:${{ env.LATEST_TAG }} \
429+
/hatchet/hatchet-engine --config /hatchet/generated
430+
echo "Waiting 30s for engine to start..."
431+
sleep 30
432+
433+
- name: Start old load test
434+
run: |
435+
docker run -d --name hatchet-loadtest --network host \
436+
-e HATCHET_CLIENT_TOKEN="${{ env.HATCHET_CLIENT_TOKEN }}" \
437+
-e HATCHET_CLIENT_TLS_STRATEGY=none \
438+
-e HATCHET_CLIENT_HOST_PORT=localhost:7077 \
439+
ghcr.io/hatchet-dev/hatchet/hatchet-loadtest:${{ env.LATEST_TAG }} \
440+
/hatchet/hatchet-load-test loadtest -e 10 -d 240s -w 60s -s 100
441+
442+
- name: Wait then apply new migrations
443+
run: |
444+
echo "Waiting 30s for load test to get started..."
445+
sleep 30
446+
echo "Applying new migrations from current branch..."
447+
go run ./cmd/hatchet-migrate
448+
echo "New migrations applied successfully"
449+
450+
- name: Wait for load test to complete
451+
run: |
452+
echo "Waiting for load test container to finish..."
453+
docker wait hatchet-loadtest
454+
EXIT_CODE=$(docker inspect hatchet-loadtest --format='{{.State.ExitCode}}')
455+
echo "Load test exited with code: $EXIT_CODE"
456+
if [ "$EXIT_CODE" != "0" ]; then
457+
echo "=== Load test logs ==="
458+
docker logs hatchet-loadtest
459+
echo "=== Engine logs ==="
460+
docker logs hatchet-engine
461+
exit 1
462+
fi
463+
echo "Load test passed"
464+
465+
- name: Teardown
466+
if: always()
467+
run: |
468+
docker rm -f hatchet-loadtest hatchet-engine 2>/dev/null || true
469+
docker compose down
470+
471+
load-deadlock:
472+
runs-on: ubicloud-standard-8
473+
timeout-minutes: 30
474+
strategy:
475+
matrix:
476+
migrate-strategy: ["latest"]
477+
rabbitmq-enabled: ["true"]
478+
pg-version: ["17-alpine"]
479+
optimistic-scheduling: ["true", "false"]
480+
481+
steps:
482+
- uses: actions/checkout@v6
483+
484+
- name: Install Task
485+
uses: arduino/setup-task@v2
486+
with:
487+
repo-token: ${{ secrets.GITHUB_TOKEN }}
488+
489+
- name: Setup Go
490+
uses: actions/setup-go@v6
491+
with:
492+
go-version: "1.25"
493+
494+
- name: Setup pnpm
495+
uses: pnpm/action-setup@v4
496+
with:
497+
version: 10.16.1
498+
run_install: false
499+
500+
- name: Go deps
501+
run: go mod download
502+
503+
- name: Add go-deadlock dependency
504+
run: go get github.com/sasha-s/go-deadlock@v0.3.6
505+
506+
- name: Patch sync imports to use go-deadlock (sed)
507+
shell: bash
508+
run: |
509+
set -euo pipefail
510+
511+
# Replace ONLY the stdlib "sync" import with an alias that preserves `sync.X` call sites.
512+
# - `import "sync"` -> `import sync "github.com/sasha-s/go-deadlock"`
513+
# - within import blocks: `"sync"` -> `sync "github.com/sasha-s/go-deadlock"`
514+
# NOTE: use `-i''` (no backup) for portability across GNU/BSD sed.
515+
find . -name '*.go' -not -path './vendor/*' -print0 | xargs -0 sed -i'' -E \
516+
-e 's/^([[:space:]]*)import[[:space:]]+"sync"[[:space:]]*$/\1import sync "github.com\/sasha-s\/go-deadlock"/' \
517+
-e 's/^([[:space:]]*)"sync"[[:space:]]*$/\1sync "github.com\/sasha-s\/go-deadlock"/'
518+
519+
# Keep formatting/import grouping consistent after rewriting.
520+
find . -name '*.go' -not -path './vendor/*' -print0 | xargs -0 gofmt -w
521+
522+
# Evidence in CI logs that rewriting happened (or not).
523+
echo "Changed Go files (after patch):"
524+
git diff --name-only -- '*.go' || true
525+
526+
echo ""
527+
echo "Contents of pkg/scheduling/v1/scheduler.go after patch:"
528+
echo "----"
529+
cat pkg/scheduling/v1/scheduler.go
530+
echo "----"
531+
532+
- name: Test (deadlock-instrumented)
533+
run: |
534+
# Disable gzip compression for load tests to reduce CPU overhead
535+
# Compression adds overhead without benefit for 0kb payloads
536+
HATCHET_CLIENT_DISABLE_GZIP_COMPRESSION=true go test -tags load ./... -p 5 -v -race -failfast -timeout 20m
537+
env:
538+
# This job adds go-deadlock + -race overhead; relax perf threshold to avoid flakes.
539+
HATCHET_LOADTEST_AVERAGE_DURATION_THRESHOLD: 1s
540+
# Give the engine a bit more time to come up under instrumentation.
541+
HATCHET_LOADTEST_STARTUP_SLEEP: 30s
542+
TESTING_MATRIX_MIGRATE: ${{ matrix.migrate-strategy }}
543+
TESTING_MATRIX_RABBITMQ_ENABLED: ${{ matrix.rabbitmq-enabled }}
544+
TESTING_MATRIX_PG_VERSION: ${{ matrix.pg-version }}
545+
TESTING_MATRIX_OPTIMISTIC_SCHEDULING: ${{ matrix.optimistic-scheduling }}
546+
342547
rampup:
343-
runs-on: ubicloud-standard-4
548+
runs-on: ubicloud-standard-8
344549
timeout-minutes: 30
345550
strategy:
346551
matrix:

api-contracts/dispatcher/dispatcher.proto

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ service Dispatcher {
3333
rpc ReleaseSlot(ReleaseSlotRequest) returns (ReleaseSlotResponse) {}
3434

3535
rpc UpsertWorkerLabels(UpsertWorkerLabelsRequest) returns (UpsertWorkerLabelsResponse) {}
36+
37+
// GetVersion returns the dispatcher protocol version as a simple integer.
38+
// SDKs use this to determine feature support (e.g. slot_config registration).
39+
// Old engines that do not implement this RPC will return UNIMPLEMENTED.
40+
rpc GetVersion(GetVersionRequest) returns (GetVersionResponse) {}
3641
}
3742

3843
message WorkerLabels {
@@ -67,7 +72,8 @@ message WorkerRegisterRequest {
6772
// (optional) the services for this worker
6873
repeated string services = 3;
6974

70-
// (optional) the number of slots this worker can handle
75+
// (optional) the number of default slots this worker can handle
76+
// deprecated: use slot_config instead
7177
optional int32 slots = 4;
7278

7379
// (optional) worker labels (i.e. state or other metadata)
@@ -79,6 +85,9 @@ message WorkerRegisterRequest {
7985
// (optional) information regarding the runtime environment of the worker
8086
optional RuntimeInfo runtime_info = 7;
8187

88+
// (optional) slot config for this worker (slot_type -> units)
89+
map<string, int32> slot_config = 9;
90+
8291
}
8392

8493
message WorkerRegisterResponse {
@@ -403,3 +412,9 @@ message ReleaseSlotRequest {
403412
}
404413

405414
message ReleaseSlotResponse {}
415+
416+
message GetVersionRequest {}
417+
418+
message GetVersionResponse {
419+
string version = 1;
420+
}

api-contracts/openapi/components/schemas/worker.yaml

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,19 @@ WorkerType:
7676
- MANAGED
7777
- WEBHOOK
7878

79+
WorkerSlotConfig:
80+
type: object
81+
description: Slot availability and limits for a slot type.
82+
properties:
83+
available:
84+
type: integer
85+
description: The number of available units for this slot type.
86+
limit:
87+
type: integer
88+
description: The maximum number of units for this slot type.
89+
required:
90+
- limit
91+
7992
RegisteredWorkflow:
8093
type: object
8194
properties:
@@ -136,12 +149,11 @@ Worker:
136149
- ACTIVE
137150
- INACTIVE
138151
- PAUSED
139-
maxRuns:
140-
type: integer
141-
description: The maximum number of runs this worker can execute concurrently.
142-
availableRuns:
143-
type: integer
144-
description: The number of runs this worker can execute concurrently.
152+
slotConfig:
153+
type: object
154+
description: Slot availability and limits for this worker (slot_type -> { available, limit }).
155+
additionalProperties:
156+
$ref: "#/WorkerSlotConfig"
145157
dispatcherId:
146158
type: string
147159
description: "the id of the assigned dispatcher, in UUID format"

api-contracts/openapi/components/schemas/workflow.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,14 @@ Step:
281281
timeout:
282282
type: string
283283
description: The timeout of the step.
284+
isDurable:
285+
type: boolean
286+
description: Whether the step is durable.
287+
slotRequests:
288+
type: object
289+
description: Slot requests for the step (slot_type -> units).
290+
additionalProperties:
291+
type: integer
284292
children:
285293
type: array
286294
items:

api-contracts/v1/workflows.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ message CreateTaskOpts {
168168
repeated Concurrency concurrency = 11; // (optional) the task concurrency options
169169
optional TaskConditions conditions = 12; // (optional) the task conditions for creating the task
170170
optional string schedule_timeout = 13; // (optional) the timeout for the schedule
171+
bool is_durable = 14; // (optional) whether the task is durable
172+
map<string, int32> slot_requests = 15; // (optional) slot requests (slot_type -> units)
171173
}
172174

173175
message CreateTaskRateLimit {

api/v1/server/handlers/v1/filters/create.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"fmt"
66

77
"github.com/google/uuid"
8+
"github.com/labstack/echo/v4"
9+
810
"github.com/hatchet-dev/hatchet/api/v1/server/oas/apierrors"
911
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
1012
"github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers/v1"
1113
v1 "github.com/hatchet-dev/hatchet/pkg/repository"
1214
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
13-
"github.com/labstack/echo/v4"
1415
)
1516

1617
func (t *V1FiltersService) V1FilterCreate(ctx echo.Context, request gen.V1FilterCreateRequestObject) (gen.V1FilterCreateResponseObject, error) {

0 commit comments

Comments
 (0)