Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
db77636
Accept Task List
snadrus Dec 29, 2025
6d4eb39
oops unlimited headroom isn't zero
snadrus Jan 6, 2026
045b933
lint
snadrus Jan 6, 2026
7c85129
oops
snadrus Jan 7, 2026
4d6b43d
reduce workload to the accepted IDs
snadrus Jan 8, 2026
76c35e6
not every task has storage
snadrus Jan 8, 2026
edeeb35
limit maxAcceptable to storage partial
snadrus Jan 8, 2026
561faed
order in tasks
snadrus Jan 9, 2026
5e526f9
cleanup
snadrus Jan 12, 2026
2159dcd
fix sql
snadrus Jan 12, 2026
e49feff
nil fix
snadrus Jan 12, 2026
85efdc1
fun with yb-not-pg type strictness
snadrus Jan 12, 2026
b15a598
working1
snadrus Jan 14, 2026
6086287
working 2
snadrus Jan 14, 2026
dfc74ad
Merge branch 'main' into harmonytasknotify
snadrus Jan 14, 2026
b176ff0
ffi too
snadrus Jan 14, 2026
3d612c1
Merge branch 'main' into harmonytasklist
snadrus Jan 14, 2026
405867e
ffi update and harmonytask fn
snadrus Jan 14, 2026
0b402ab
Merge branch 'harmonytasklist' into harmonytasknotify
snadrus Jan 14, 2026
2030d6a
notes
snadrus Jan 15, 2026
abc81be
Merge branch 'main' into harmonytasknotify
snadrus Jan 19, 2026
414d4ee
poll duration, task scheduling
snadrus Jan 20, 2026
2613bb0
scheduler comment
snadrus Jan 20, 2026
9ea6fe8
progress
snadrus Jan 21, 2026
9850dbc
getting there
snadrus Jan 22, 2026
f7c8a5b
chipping away
snadrus Jan 23, 2026
784f8d2
doc
snadrus Feb 10, 2026
345d9db
notes
snadrus Feb 12, 2026
3075c0e
Merge branch 'main' into harmonytasknotify
snadrus Feb 12, 2026
7dd7983
need-testing
snadrus Feb 16, 2026
8bf13d2
tests and bug fixes
snadrus Feb 18, 2026
0a236f4
fixes
snadrus Feb 18, 2026
c360fb7
Merge branch 'main' into harmonytasknotify
snadrus Feb 18, 2026
65d2003
lint
snadrus Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/curio/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func CurioHandler(
remote http.HandlerFunc,
a api.Curio,
prometheusSD http.Handler,
dependencies *deps.Deps,
permissioned bool) http.Handler {
mux := mux.NewRouter()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
Expand All @@ -81,6 +82,9 @@ func CurioHandler(

mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
if dependencies.PeerHTTP != nil {
mux.Handle("/peer/v1", dependencies.PeerHTTP) // Peer-to-peer HTTP POST communication
}
mux.PathPrefix("/remote").HandlerFunc(remote)
mux.Handle("/debug/metrics", metrics.Exporter())
mux.Handle("/debug/service-discovery", prometheusSD)
Expand Down Expand Up @@ -474,13 +478,15 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c
return payload.Allow, nil
}
}

// Serve the RPC.
srv := &http.Server{
Handler: CurioHandler(
authVerify,
remoteHandler,
&CurioAPI{dependencies, dependencies.Si, shutdownChan},
prometheusServiceDiscovery(ctx, dependencies),
dependencies,
permissioned),
ReadHeaderTimeout: time.Minute * 3,
BaseContext: func(listener net.Listener) context.Context {
Expand Down
2 changes: 1 addition & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
"miner_addresses", miners,
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))

ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr)
ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr, dependencies.PeerHTTP)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/lib/cachedreader"
"github.com/filecoin-project/curio/lib/curiochain"
harmonypeerhttp "github.com/filecoin-project/curio/lib/harmony_peer_http"
"github.com/filecoin-project/curio/lib/multictladdr"
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/lib/pieceprovider"
Expand Down Expand Up @@ -177,6 +178,7 @@ type Deps struct {
ServeChunker *chunker.ServeChunker
EthClient *lazy.Lazy[*ethclient.Client]
Sender *message.Sender
PeerHTTP *harmonypeerhttp.PeerHTTP
}

const (
Expand Down Expand Up @@ -296,6 +298,10 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context,
}
}

if deps.PeerHTTP == nil {
deps.PeerHTTP = harmonypeerhttp.New(deps.ListenAddr)
}

if deps.Alert == nil {
deps.Alert = alertmanager.NewAlertNow(deps.DB, deps.ListenAddr)
}
Expand Down
165 changes: 165 additions & 0 deletions harmony/harmonytask/PR_REVIEW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# PR: harmonytasknotify — Event-driven scheduler with peer-to-peer task notification

## Summary

Replaces the polling-only task scheduler with an event-driven scheduler that
uses peer-to-peer RPC to notify other nodes about new tasks, reservations,
and work starts. Removes the `Follows` mechanism in favor of explicit
`AddTaskByName` calls. Adds integration tests for the scheduler and peering.

### Key changes

- **New scheduler** (`scheduler.go`): single-threaded event loop reading from
`schedulerChannel`. Events arrive from local `AddTaskByName`, peering
messages, task completions, and retries. A `bundler` coalesces rapid-fire
events before attempting work. DB polling remains as a fallback on a
configurable interval.
- **Peering layer** (`peering.go`): on startup, each node connects to all
known `harmony_machines` peers, performs an identity + capability handshake,
then exchanges binary task messages (`new`, `reserve`, `started`).
- **PeerHTTP transport** (`lib/harmony_peer_http/`): HTTP POST based peer
communication mounted at `/peer/v1`. Each message is a single POST with
`X-Peer-ID` header.
- **`AddTaskByName`** moved from per-handler `AddTask` to a single method on
`TaskEngine`. It inserts the DB row, emits a `schedulerSourceAdded` event,
and the scheduler broadcasts `TellOthers`.
- **`Follows` removed**: all task-chaining now goes through `AddTaskByName`.
The `Follows` field, `followWorkInDB` method, and `FOLLOW_FREQUENCY` const
are gone. All `Follows: nil` annotations removed from task implementations.
- **`considerWork` now accepts `[]task` + `eventEmitter`** instead of
`[]TaskID`. It emits `TaskStarted`, `TaskCompleted`, and retry events back
to the scheduler.
- **`recordCompletion` uses `context.Background()`** so it always completes
even during graceful shutdown.
- **CanAccept caching**: when the DB claim accepts fewer tasks than
`CanAccept` approved, the remainder is cached for the next poll cycle.
- **`test_support.go` reduced**: only exports `PipeNetwork`/`PipeNode`
(in-memory `PeerConnectorInterface`). All scheduler-internal test hooks
removed.
- **Integration tests** (`itests/harmonytask/`): scheduler tests
(routing, concurrency, retry, shared tasks, max concurrency) and peering
end-to-end tests, all using real DB and full `TaskEngine` objects.
- **Self-peering fix**: `startPeering` skips connecting to self.

---

## Workflow trace

### 1. Local add → local execution → success ✓

AddTaskByName
→ DB INSERT (owner_id=NULL)
→ schedulerChannel ← schedulerSourceAdded
Scheduler
→ add to availableTasks, bundleCollector(10ms), TellOthers(newTask)
bundleSleep fires
→ waterfall → pollerTryAllWork → considerWork
→ SQL claim (SET owner_id, SKIP LOCKED) → goroutine
Goroutine
→ EmitTaskStarted → scheduler removes from availableTasks, TellOthers(started)
→ Do() → success
→ recordCompletion: DELETE task, INSERT history
→ EmitTaskCompleted → scheduler waterfall (free capacity)

### 2. Local add → peer picks up ✓

Node A: AddTaskByName("X") → schedulerSourceAdded
Scheduler A: add to local, TellOthers(newTask) → wire message to Node B
Node B: handlePeerMessage → schedulerSourcePeerNewTask
Scheduler B: add to availableTasks, bundleCollector
bundleSleep → waterfall → considerWork → SQL claim
Both race; SKIP LOCKED ensures single winner.

### 3. Task failure → retry ✓

Goroutine: Do() returns (false, err)
Deferred:
recordCompletion: UPDATE owner_id=NULL, retries++ (or DELETE if max)
EmitTaskCompleted → scheduler waterfall
EmitTaskNew (schedulerSourceAdded, Retries=old_count)
Scheduler:
add to availableTasks(UpdateTime=now, Retries=old)
bundleCollector → 10ms → waterfall
pollerTryAllWork filters by RetryWait(old_count) vs time.Since(now)
→ task not ready yet (only 10ms elapsed) → skipped
Next DB poll:
taskSourceDb.GetTasks reads fresh retries from DB
If RetryWait elapsed → considerWork → claim → run

Note: TellOthers broadcasts to peers with retries=0 (messageRenderTaskSend
omits retries). Peers may attempt the task immediately. This is acceptable:
the failure may be node-specific, and distributed retry is desirable.

### 4. Task failure → max failures → permanent drop ✓

recordCompletion: retries >= MaxFailures-1 → DELETE from harmony_task
EmitTaskNew fires → schedulerSourceAdded → added to local availableTasks
bundleSleep → waterfall → SQL claim → row doesn't exist → no work
Stale local entry cleaned up on next DB poll (taskSourceDb replaces map).
TellOthers to peers → same: they try, DB row gone, harmless.

### 5. DB poll fallback ✓

time.After(pollDuration) fires
waterfall(taskSourceDb) → SQL SELECT unowned tasks → replaces local cache
pollerTryAllWork → considerWork → claim → run
This is the safety net: all tasks are eventually discovered even without
peer notification.

### 6. Recover (startup resurrection) ✓

New() queries harmony_task WHERE owner_id=us
considerWork(WorkSourceRecover, tasks) → skips SQL claim (already ours)
Goroutines start, emit EmitTaskStarted
Scheduler not started yet → events buffer in channel (pre-sized)
startScheduler() → processes buffered events
schedulerSourceTaskStarted: delete from availableTasks (no-op, wasn't added)
TellOthers(started) → peers learn about it. Correct.

### 7. IAmBored → generate work ⚠️

pollerTryAllWork reaches IAmBored section (no existing work accepted)
IAmBored callback calls AddTaskByName
AddTaskByName does DB INSERT then: schedulerChannel <- event (BLOCKING)
BUT: pollerTryAllWork runs on the scheduler goroutine
The scheduler goroutine is the sole reader of schedulerChannel
If IAmBored generates >100 tasks (channel capacity), the 101st
AddTaskByName blocks forever → DEADLOCK.

In practice IAmBored typically adds 1-2 tasks, but CC sector creation
could theoretically add many. Worth protecting against.

### 8. Peer handshake (symmetric) ✓

Both sides of a connection call handlePeer simultaneously.
Both send "i:<addr>", both read the other's identity.
Both query DB for the other's task capabilities.
Both add peer to p.peers and p.m.
Both enter receive loop.

If A and B connect to each other simultaneously, two connections form.
TellOthers sends to both → receiver gets duplicate events.
Duplicates are harmless: map overwrites, delete no-ops, extra DB attempts
fail gracefully via SKIP LOCKED.

### 9. Bundler coalescing ✓

Rapid events: bundleCollector called N times for same taskType
First call creates timer + goroutine. Subsequent calls Reset(10ms).
After 10ms of quiet: timer fires, goroutine does delete(timers, key)
then sends on output. Scheduler processes waterfall.
Next burst: key absent → new timer + goroutine. Correct.

Edge case: goroutine blocked on output send while new event arrives.
delete() already ran, so new timer is created → two sends for same type.
Scheduler runs two waterfalls. Extra work but harmless.

---

## Open issue

**IAmBored deadlock** (Flow 7): `AddTaskByName` does a blocking channel
send on the scheduler goroutine. If `IAmBored` adds enough tasks to fill
the 100-slot channel buffer, the scheduler deadlocks. Consider using a
non-blocking send with overflow to a local slice, or limiting IAmBored to
one AddTask call per invocation.
16 changes: 16 additions & 0 deletions harmony/harmonytask/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ work by hardware, parallelizabilty, reliability, or any other reason.
Workers will be Greedy: vaccuuming up their favorite jobs from a list.
Once 1 task is accepted, harmonydb tries to get other task runner
machines to accept work (round robin) before trying again to accept.

The task system is designed to first take "any" work we are capable of doing,
up to the limit of our resources. Then as work queues, it will reserve
resources for tasks it soft-claims (reserves) so high-priority tasks can run
and not be starved. This allows clusters to respect the run order of tasks.
Order priority may starve lower priority tasks, so within a priority class,
we should prefer the oldest tasks first.

ex: prio: prio.P0 | prio.LessThan(‘x’, ‘y’,’z’) | prio.PipelineOrder(‘a1’,’b2’,’c3’)

As this requires tasks to start serially, we also do not use mutexes for
CanAccept() / tryWork().

Polling is too db-heavy, so nodes contact each other to share work, reserve,
and announce acceptance of work. The actual claim happens in the database.

*
Mental Model:

Expand Down
Loading
Loading