Skip to content

Commit 5a60017

Browse files
committed
fix: lint
1 parent 81e04ac commit 5a60017

File tree

14 files changed

+184
-38
lines changed

14 files changed

+184
-38
lines changed

api/v1/server/oas/transformers/v1/worker.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,13 @@ func ToWorkerSqlc(worker *sqlcv1.Worker, slotConfig map[string]gen.WorkerSlotCon
8888
CreatedAt: worker.CreatedAt.Time,
8989
UpdatedAt: worker.UpdatedAt.Time,
9090
},
91-
Name: worker.Name,
92-
Type: gen.WorkerType(worker.Type),
93-
Status: &status,
94-
DispatcherId: dispatcherId,
95-
SlotConfig: slotConfigInt,
96-
RuntimeInfo: ToWorkerRuntimeInfo(worker),
97-
WebhookId: worker.WebhookId,
91+
Name: worker.Name,
92+
Type: gen.WorkerType(worker.Type),
93+
Status: &status,
94+
DispatcherId: dispatcherId,
95+
SlotConfig: slotConfigInt,
96+
RuntimeInfo: ToWorkerRuntimeInfo(worker),
97+
WebhookId: worker.WebhookId,
9898
}
9999

100100
if !worker.LastHeartbeatAt.Time.IsZero() {

api/v1/server/oas/transformers/worker.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ func ToWorkerSqlc(worker *sqlcv1.Worker, slotConfig map[string]gen.WorkerSlotCon
7979
}
8080

8181
res := &gen.Worker{
82-
Metadata: *toAPIMetadata(worker.ID, worker.CreatedAt.Time, worker.UpdatedAt.Time),
83-
Name: worker.Name,
84-
Type: gen.WorkerType(worker.Type),
85-
Status: &status,
86-
DispatcherId: dispatcherId,
87-
SlotConfig: slotConfigInt,
88-
RuntimeInfo: ToWorkerRuntimeInfo(worker),
89-
WebhookId: worker.WebhookId,
82+
Metadata: *toAPIMetadata(worker.ID, worker.CreatedAt.Time, worker.UpdatedAt.Time),
83+
Name: worker.Name,
84+
Type: gen.WorkerType(worker.Type),
85+
Status: &status,
86+
DispatcherId: dispatcherId,
87+
SlotConfig: slotConfigInt,
88+
RuntimeInfo: ToWorkerRuntimeInfo(worker),
89+
WebhookId: worker.WebhookId,
9090
}
9191

9292
if !worker.LastHeartbeatAt.Time.IsZero() {

examples/go/child-workflows/main.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ func Child(client *hatchet.Client) *hatchet.StandaloneTask {
7575
)
7676
}
7777

78-
7978
func main() {
8079
client, err := hatchet.NewClient()
8180
if err != nil {
@@ -127,7 +126,6 @@ func main() {
127126
return err
128127
}
129128

130-
131129
_ = childResult
132130

133131
n := 5

examples/go/migration-guides/mergent.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ func ProcessImageMergent(req MergentRequest) (*MergentResponse, error) {
4545
}, nil
4646
}
4747

48-
4948
// > After (Hatchet)
5049
type ImageProcessInput struct {
5150
ImageURL string `json:"image_url"`

examples/go/on-event/main.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ func Lower(client *hatchet.Client) *hatchet.StandaloneTask {
4040
)
4141
}
4242

43-
4443
// > Accessing the filter payload
4544
func accessFilterPayload(ctx hatchet.Context, input EventInput) (*LowerTaskOutput, error) {
4645
fmt.Println(ctx.FilterPayload())
@@ -49,7 +48,6 @@ func accessFilterPayload(ctx hatchet.Context, input EventInput) (*LowerTaskOutpu
4948
}, nil
5049
}
5150

52-
5351
// > Declare with filter
5452
func LowerWithFilter(client *hatchet.Client) *hatchet.StandaloneTask {
5553
return client.NewStandaloneTask(
@@ -66,7 +64,6 @@ func LowerWithFilter(client *hatchet.Client) *hatchet.StandaloneTask {
6664
)
6765
}
6866

69-
7067
func Upper(client *hatchet.Client) *hatchet.StandaloneTask {
7168
return client.NewStandaloneTask(
7269
"upper", func(ctx hatchet.Context, input EventInput) (*UpperTaskOutput, error) {

examples/go/sticky-workers/main.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ func StickyDag(client *hatchet.Client) *hatchet.Workflow {
4848
return stickyDag
4949
}
5050

51-
5251
type ChildInput struct {
5352
N int `json:"n"`
5453
}
@@ -91,4 +90,3 @@ func Sticky(client *hatchet.Client) *hatchet.StandaloneTask {
9190

9291
return sticky
9392
}
94-

examples/go/streaming/consumer/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,3 @@ func main() {
3434

3535
fmt.Println("\nStreaming completed!")
3636
}
37-

examples/go/streaming/server/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,3 @@ func main() {
5454
log.Println("Failed to start server:", err)
5555
}
5656
}
57-

examples/go/streaming/shared/task.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ func StreamTask(ctx hatchet.Context, input StreamTaskInput) (*StreamTaskOutput,
4646
}, nil
4747
}
4848

49-
5049
func StreamingWorkflow(client *hatchet.Client) *hatchet.StandaloneTask {
5150
return client.NewStandaloneTask("stream-example", StreamTask)
5251
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# > Simple
2+
import argparse
3+
import asyncio
4+
import signal
5+
import threading
6+
import time
7+
import traceback
8+
from typing import Any
9+
10+
from datetime import datetime, timezone
11+
from pathlib import Path
12+
13+
from hatchet_sdk import Context, EmptyModel, Hatchet
14+
15+
hatchet = Hatchet(debug=True)
16+
17+
FAILURE_LOG = Path(__file__).parent / "failures.log"
18+
19+
# Track the current worker so we can clean up on Ctrl+C
20+
_current_worker = None
21+
_current_thread = None
22+
# poetry run python ./simple/worker_test.py --suffix new
23+
24+
25+
def log_failure(phase: str, error: Exception) -> None:
26+
"""Log a failure loudly to stderr and append to the failures log file."""
27+
timestamp = datetime.now(timezone.utc).isoformat()
28+
tb = traceback.format_exception(type(error), error, error.__traceback__)
29+
tb_str = "".join(tb)
30+
31+
msg = f"[{timestamp}] FAILURE during {phase}: {error}\n{tb_str}"
32+
33+
# Loud stderr output
34+
print(f"\n{'!' * 60}", flush=True)
35+
print(f"!!! FAILURE: {phase} !!!", flush=True)
36+
print(msg, flush=True)
37+
print(f"{'!' * 60}\n", flush=True)
38+
39+
# Append to log file
40+
with open(FAILURE_LOG, "a") as f:
41+
f.write(msg)
42+
f.write("-" * 60 + "\n")
43+
44+
45+
@hatchet.task()
46+
def simple(input: EmptyModel, ctx: Context) -> dict[str, str]:
47+
print("Executing simple task!")
48+
return {"result": "Hello, world!"}
49+
50+
51+
@hatchet.durable_task()
52+
def simple_durable(input: EmptyModel, ctx: Context) -> dict[str, str]:
53+
print("Executing durable task!")
54+
return {"result": "Hello from durable!"}
55+
56+
57+
def _force_stop_worker(worker: Any, thread: threading.Thread) -> None:
58+
"""Forcefully terminate the worker and its child processes."""
59+
worker.killing = True
60+
worker._terminate_processes()
61+
worker._close_queues()
62+
if worker.loop and worker.loop.is_running():
63+
worker.loop.call_soon_threadsafe(worker.loop.stop)
64+
thread.join(timeout=5)
65+
66+
67+
def start_worker(suffix: str = "") -> tuple[Any, threading.Thread]:
68+
"""Create and start a worker in a background thread."""
69+
name = f"test-worker-{suffix}" if suffix else "test-worker"
70+
worker = hatchet.worker(
71+
name,
72+
workflows=[simple, simple_durable],
73+
slots=10,
74+
)
75+
worker.handle_kill = False # Prevent sys.exit on shutdown
76+
77+
# Restore default signal handlers so Ctrl+C raises KeyboardInterrupt
78+
signal.signal(signal.SIGINT, signal.default_int_handler)
79+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
80+
81+
thread = threading.Thread(target=worker.start, daemon=True)
82+
thread.start()
83+
84+
# Give the worker a moment to initialize
85+
time.sleep(2)
86+
print("Worker connected.")
87+
return worker, thread
88+
89+
90+
def stop_worker(worker: Any, thread: threading.Thread) -> None:
91+
"""Stop the worker gracefully."""
92+
try:
93+
if worker.loop and worker.loop.is_running():
94+
asyncio.run_coroutine_threadsafe(worker.exit_gracefully(), worker.loop)
95+
thread.join(timeout=10)
96+
if thread.is_alive():
97+
_force_stop_worker(worker, thread)
98+
print("Worker disconnected.")
99+
except Exception as e:
100+
log_failure("worker disconnect", e)
101+
102+
103+
def main() -> None:
104+
global _current_worker, _current_thread
105+
106+
parser = argparse.ArgumentParser()
107+
parser.add_argument(
108+
"--suffix",
109+
default="",
110+
help="Suffix to append to the worker name (e.g. 'old' or 'new')",
111+
)
112+
args = parser.parse_args()
113+
114+
try:
115+
while True:
116+
# --- Connect the worker ---
117+
print("\n=== Connecting worker ===")
118+
try:
119+
worker, thread = start_worker(args.suffix)
120+
_current_worker, _current_thread = worker, thread
121+
except Exception as e:
122+
log_failure("worker connect", e)
123+
time.sleep(5)
124+
continue
125+
126+
# --- Trigger tasks every 1 second for 5 seconds ---
127+
for tick in range(5):
128+
time.sleep(1)
129+
print(f"\n--- Triggering tasks (tick {tick + 1}/5) ---")
130+
try:
131+
ref = simple.run_no_wait()
132+
print(f"Task triggered: {ref}")
133+
except Exception as e:
134+
log_failure(f"task trigger (tick {tick + 1}/5)", e)
135+
try:
136+
ref = simple_durable.run_no_wait()
137+
print(f"Durable task triggered: {ref}")
138+
except Exception as e:
139+
log_failure(f"durable task trigger (tick {tick + 1}/5)", e)
140+
141+
# --- Disconnect the worker ---
142+
print("\n=== Disconnecting worker ===")
143+
stop_worker(worker, thread)
144+
_current_worker, _current_thread = None, None
145+
146+
except KeyboardInterrupt:
147+
print("\n\nCtrl+C received, shutting down...")
148+
if _current_worker and _current_thread:
149+
_force_stop_worker(_current_worker, _current_thread)
150+
print("Bye!")
151+
152+
153+
154+
if __name__ == "__main__":
155+
main()

0 commit comments

Comments
 (0)