Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions examples/go/child-workflows/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func Child(client *hatchet.Client) *hatchet.StandaloneTask {
)
}


func main() {
client, err := hatchet.NewClient()
if err != nil {
Expand Down Expand Up @@ -127,7 +126,6 @@ func main() {
return err
}


_ = childResult

n := 5
Expand Down
1 change: 0 additions & 1 deletion examples/go/migration-guides/mergent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func ProcessImageMergent(req MergentRequest) (*MergentResponse, error) {
}, nil
}


// > After (Hatchet)
type ImageProcessInput struct {
ImageURL string `json:"image_url"`
Expand Down
3 changes: 0 additions & 3 deletions examples/go/on-event/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func Lower(client *hatchet.Client) *hatchet.StandaloneTask {
)
}


// > Accessing the filter payload
func accessFilterPayload(ctx hatchet.Context, input EventInput) (*LowerTaskOutput, error) {
fmt.Println(ctx.FilterPayload())
Expand All @@ -49,7 +48,6 @@ func accessFilterPayload(ctx hatchet.Context, input EventInput) (*LowerTaskOutpu
}, nil
}


// > Declare with filter
func LowerWithFilter(client *hatchet.Client) *hatchet.StandaloneTask {
return client.NewStandaloneTask(
Expand All @@ -66,7 +64,6 @@ func LowerWithFilter(client *hatchet.Client) *hatchet.StandaloneTask {
)
}


func Upper(client *hatchet.Client) *hatchet.StandaloneTask {
return client.NewStandaloneTask(
"upper", func(ctx hatchet.Context, input EventInput) (*UpperTaskOutput, error) {
Expand Down
2 changes: 0 additions & 2 deletions examples/go/sticky-workers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func StickyDag(client *hatchet.Client) *hatchet.Workflow {
return stickyDag
}


type ChildInput struct {
N int `json:"n"`
}
Expand Down Expand Up @@ -91,4 +90,3 @@ func Sticky(client *hatchet.Client) *hatchet.StandaloneTask {

return sticky
}

1 change: 0 additions & 1 deletion examples/go/streaming/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@ func main() {

fmt.Println("\nStreaming completed!")
}

1 change: 0 additions & 1 deletion examples/go/streaming/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,3 @@ func main() {
log.Println("Failed to start server:", err)
}
}

1 change: 0 additions & 1 deletion examples/go/streaming/shared/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func StreamTask(ctx hatchet.Context, input StreamTaskInput) (*StreamTaskOutput,
}, nil
}


func StreamingWorkflow(client *hatchet.Client) *hatchet.StandaloneTask {
return client.NewStandaloneTask("stream-example", StreamTask)
}
7 changes: 7 additions & 0 deletions pkg/repository/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package repository

import (
"context"
"errors"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"

"github.com/hatchet-dev/hatchet/pkg/repository/sqlchelpers"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"
)
Expand Down Expand Up @@ -65,6 +68,10 @@ func (t *tickerRepository) IsTenantAlertActive(ctx context.Context, tenantId uui
res, err := t.queries.IsTenantAlertActive(ctx, t.pool, tenantId)

if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return false, time.Time{}, nil
}

return false, time.Now(), err
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/repository/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ func (r *sharedRepository) makeTriggerDecisions(ctx context.Context, filters []*
shouldTrigger, err := r.processWorkflowExpression(ctx, filter.Expression, opt, filter.Payload)

if err != nil {
r.l.Error().
// This is notified to the user via an olap event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is? I'm pretty sure we write this to the db but don't show it on the frontend or anything, at least right now

r.l.Warn().
Err(err).
Str("expression", filter.Expression).
Msg("Failed to evaluate workflow expression")
Expand All @@ -201,6 +202,8 @@ func (r *sharedRepository) makeTriggerDecisions(ctx context.Context, filters []*
Source: sqlcv1.V1CelEvaluationFailureSourceFILTER,
ErrorMessage: err.Error(),
})

continue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afict we were inserting decision twice here for the same filterId

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, this looks like a bug to me 👍 thanks

}
Comment on lines 186 to 207
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change (adding continue on expression evaluation error) alters runtime behavior (it prevents the subsequent decisions = append(...shouldTrigger...) from running in the error case). The PR description/type currently claims a refactor with no behavior change; it should be updated to reflect that this is a functional bug fix.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont think its a functional bug... i think we probably dedupe this somewhere higher, but i did not trace this


decisions = append(decisions, TriggerDecision{
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduling/v1/queuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"context"
"errors"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -160,6 +161,12 @@ func (q *Queuer) loopQueue(ctx context.Context) {
qis, err := q.refillQueue(ctx)

if err != nil {
// right now, the context is only cancelled on shut down which is not a problem for the queuer.
if errors.Is(ctx.Err(), context.Canceled) {
span.End()
continue
}

span.RecordError(err)
span.End()
q.l.Error().Err(err).Msg("error refilling queue")
Expand Down
Loading