diff --git a/examples/go/child-workflows/main.go b/examples/go/child-workflows/main.go index 4c6f900827..7fc17927f2 100644 --- a/examples/go/child-workflows/main.go +++ b/examples/go/child-workflows/main.go @@ -75,7 +75,6 @@ func Child(client *hatchet.Client) *hatchet.StandaloneTask { ) } - func main() { client, err := hatchet.NewClient() if err != nil { @@ -127,7 +126,6 @@ func main() { return err } - _ = childResult n := 5 diff --git a/examples/go/migration-guides/mergent.go b/examples/go/migration-guides/mergent.go index 777d988c25..54bef30777 100644 --- a/examples/go/migration-guides/mergent.go +++ b/examples/go/migration-guides/mergent.go @@ -45,7 +45,6 @@ func ProcessImageMergent(req MergentRequest) (*MergentResponse, error) { }, nil } - // > After (Hatchet) type ImageProcessInput struct { ImageURL string `json:"image_url"` diff --git a/examples/go/on-event/main.go b/examples/go/on-event/main.go index e4605b1d52..73ade3c216 100644 --- a/examples/go/on-event/main.go +++ b/examples/go/on-event/main.go @@ -40,7 +40,6 @@ func Lower(client *hatchet.Client) *hatchet.StandaloneTask { ) } - // > Accessing the filter payload func accessFilterPayload(ctx hatchet.Context, input EventInput) (*LowerTaskOutput, error) { fmt.Println(ctx.FilterPayload()) @@ -49,7 +48,6 @@ func accessFilterPayload(ctx hatchet.Context, input EventInput) (*LowerTaskOutpu }, nil } - // > Declare with filter func LowerWithFilter(client *hatchet.Client) *hatchet.StandaloneTask { return client.NewStandaloneTask( @@ -66,7 +64,6 @@ func LowerWithFilter(client *hatchet.Client) *hatchet.StandaloneTask { ) } - func Upper(client *hatchet.Client) *hatchet.StandaloneTask { return client.NewStandaloneTask( "upper", func(ctx hatchet.Context, input EventInput) (*UpperTaskOutput, error) { diff --git a/examples/go/sticky-workers/main.go b/examples/go/sticky-workers/main.go index def4bfe80a..f34306d704 100644 --- a/examples/go/sticky-workers/main.go +++ b/examples/go/sticky-workers/main.go @@ -48,7 +48,6 @@ func StickyDag(client *hatchet.Client) *hatchet.Workflow { return stickyDag } - type ChildInput struct { N int `json:"n"` } @@ -91,4 +90,3 @@ func Sticky(client *hatchet.Client) *hatchet.StandaloneTask { return sticky } - diff --git a/examples/go/streaming/consumer/main.go b/examples/go/streaming/consumer/main.go index 59d120e44e..358a10c1d3 100644 --- a/examples/go/streaming/consumer/main.go +++ b/examples/go/streaming/consumer/main.go @@ -34,4 +34,3 @@ func main() { fmt.Println("\nStreaming completed!") } - diff --git a/examples/go/streaming/server/main.go b/examples/go/streaming/server/main.go index 4438a5f19f..c527918e4b 100644 --- a/examples/go/streaming/server/main.go +++ b/examples/go/streaming/server/main.go @@ -54,4 +54,3 @@ func main() { log.Println("Failed to start server:", err) } } - diff --git a/examples/go/streaming/shared/task.go b/examples/go/streaming/shared/task.go index 8b6070a473..adce03d0e4 100644 --- a/examples/go/streaming/shared/task.go +++ b/examples/go/streaming/shared/task.go @@ -46,7 +46,6 @@ func StreamTask(ctx hatchet.Context, input StreamTaskInput) (*StreamTaskOutput, }, nil } - func StreamingWorkflow(client *hatchet.Client) *hatchet.StandaloneTask { return client.NewStandaloneTask("stream-example", StreamTask) } diff --git a/pkg/repository/ticker.go b/pkg/repository/ticker.go index 643941ab1a..60f8193eab 100644 --- a/pkg/repository/ticker.go +++ b/pkg/repository/ticker.go @@ -2,9 +2,12 @@ package repository import ( "context" + "errors" "time" "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/hatchet-dev/hatchet/pkg/repository/sqlchelpers" "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" ) @@ -65,6 +68,10 @@ func (t *tickerRepository) IsTenantAlertActive(ctx context.Context, tenantId uui res, err := t.queries.IsTenantAlertActive(ctx, t.pool, tenantId) if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return false, time.Time{}, nil + } + return false, time.Now(), err } diff --git a/pkg/repository/trigger.go b/pkg/repository/trigger.go index d6df4cafcb..096c3de318 100644 --- a/pkg/repository/trigger.go +++ b/pkg/repository/trigger.go @@ -184,7 +184,8 @@ func (r *sharedRepository) makeTriggerDecisions(ctx context.Context, filters []* shouldTrigger, err := r.processWorkflowExpression(ctx, filter.Expression, opt, filter.Payload) if err != nil { - r.l.Error(). + // This is notified to the user via an olap event. + r.l.Warn(). Err(err). Str("expression", filter.Expression). Msg("Failed to evaluate workflow expression") @@ -201,6 +202,8 @@ func (r *sharedRepository) makeTriggerDecisions(ctx context.Context, filters []* Source: sqlcv1.V1CelEvaluationFailureSourceFILTER, ErrorMessage: err.Error(), }) + + continue } decisions = append(decisions, TriggerDecision{ diff --git a/pkg/scheduling/v1/queuer.go b/pkg/scheduling/v1/queuer.go index ff33aab3c1..9648842c2a 100644 --- a/pkg/scheduling/v1/queuer.go +++ b/pkg/scheduling/v1/queuer.go @@ -2,6 +2,7 @@ package v1 import ( "context" + "errors" "sort" "sync" "time" @@ -160,6 +161,12 @@ func (q *Queuer) loopQueue(ctx context.Context) { qis, err := q.refillQueue(ctx) if err != nil { + // right now, the context is only cancelled on shut down which is not a problem for the queuer. + if errors.Is(ctx.Err(), context.Canceled) { + span.End() + continue + } + span.RecordError(err) span.End() q.l.Error().Err(err).Msg("error refilling queue")