Skip to content

Commit d79f24b

Browse files
authored
pool: fix job payload cleanup during shutdown (#52)
* pool: fix job payload cleanup during shutdown Fixes a bug where job payloads weren't being properly cleaned up during node shutdown, causing subsequent DispatchJob calls to fail with ErrJobExists even though the jobs were no longer running. Changes: - Add cleanup of job payloads when removing workers - Run periodic cleanup of orphaned job payloads - Fix potential race condition in tickers if created after the node is shutdown * Upgrade to Go 1.24
1 parent 2598358 commit d79f24b

File tree

7 files changed

+257
-80
lines changed

7 files changed

+257
-80
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
strategy:
1414
fail-fast: true
1515
matrix:
16-
go: ['1.23']
16+
go: ['1.24']
1717
os: ['ubuntu-latest']
1818
runs-on: ${{ matrix.os }}
1919

go.mod

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,35 @@
11
module goa.design/pulse
22

3-
go 1.22.7
4-
5-
toolchain go1.23.1
3+
go 1.24.0
64

75
require (
86
github.com/gorilla/websocket v1.5.3
97
github.com/oklog/ulid/v2 v2.1.0
10-
github.com/redis/go-redis/v9 v9.7.0
8+
github.com/redis/go-redis/v9 v9.7.1
119
github.com/stretchr/testify v1.10.0
1210
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.59.0
1311
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0
1412
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.34.0
1513
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0
16-
goa.design/clue v1.0.7
17-
goa.design/goa/v3 v3.19.1
14+
goa.design/clue v1.1.0
15+
goa.design/goa/v3 v3.20.0
1816
goa.design/model v1.9.8
19-
google.golang.org/grpc v1.69.4
17+
google.golang.org/grpc v1.71.0
2018
)
2119

2220
require (
23-
github.com/aws/smithy-go v1.22.1 // indirect
21+
github.com/aws/smithy-go v1.22.3 // indirect
2422
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
2523
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2624
github.com/davecgh/go-spew v1.1.1 // indirect
2725
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
2826
github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598 // indirect
2927
github.com/felixge/httpsnoop v1.0.4 // indirect
30-
github.com/go-chi/chi/v5 v5.2.0 // indirect
28+
github.com/go-chi/chi/v5 v5.2.1 // indirect
3129
github.com/go-logr/logr v1.4.2 // indirect
3230
github.com/go-logr/stdr v1.2.2 // indirect
3331
github.com/google/uuid v1.6.0 // indirect
34-
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect
32+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
3533
github.com/manveru/faker v0.0.0-20171103152722-9fbc68a78c4d // indirect
3634
github.com/pmezard/go-difflib v1.0.0 // indirect
3735
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
@@ -44,16 +42,16 @@ require (
4442
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
4543
go.opentelemetry.io/otel/trace v1.34.0 // indirect
4644
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
47-
golang.org/x/mod v0.22.0 // indirect
48-
golang.org/x/net v0.34.0 // indirect
49-
golang.org/x/sync v0.10.0 // indirect
50-
golang.org/x/sys v0.29.0 // indirect
51-
golang.org/x/term v0.28.0 // indirect
52-
golang.org/x/text v0.21.0 // indirect
53-
golang.org/x/tools v0.29.0 // indirect
54-
google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f // indirect
55-
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
56-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
57-
google.golang.org/protobuf v1.36.3 // indirect
45+
golang.org/x/mod v0.23.0 // indirect
46+
golang.org/x/net v0.36.0 // indirect
47+
golang.org/x/sync v0.11.0 // indirect
48+
golang.org/x/sys v0.30.0 // indirect
49+
golang.org/x/term v0.29.0 // indirect
50+
golang.org/x/text v0.22.0 // indirect
51+
golang.org/x/tools v0.30.0 // indirect
52+
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb // indirect
53+
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
54+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
55+
google.golang.org/protobuf v1.36.5 // indirect
5856
gopkg.in/yaml.v3 v3.0.1 // indirect
5957
)

go.sum

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro=
2-
github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
1+
github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k=
2+
github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
33
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
44
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
55
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
@@ -16,23 +16,23 @@ github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598 h1:MGKhKyiYrvMDZs
1616
github.com/dimfeld/httppath v0.0.0-20170720192232-ee938bf73598/go.mod h1:0FpDmbrt36utu8jEmeU05dPC9AB5tsLYVVi+ZHfyuwI=
1717
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
1818
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
19-
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
20-
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
19+
github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8=
20+
github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
2121
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
2222
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
2323
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
2424
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
2525
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
2626
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
2727
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
28-
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
29-
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
28+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
29+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
3030
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
3131
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
3232
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
3333
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
34-
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
35-
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
34+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo=
35+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI=
3636
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
3737
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
3838
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -48,8 +48,8 @@ github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNs
4848
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
4949
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
5050
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
51-
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
52-
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
51+
github.com/redis/go-redis/v9 v9.7.1 h1:4LhKRCIduqXqtvCUlaq9c8bdHOkICjDMrr1+Zb3osAc=
52+
github.com/redis/go-redis/v9 v9.7.1/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
5353
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
5454
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
5555
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
@@ -72,10 +72,10 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 h1:tgJ0u
7272
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0/go.mod h1:U7HYyW0zt/a9x5J1Kjs+r1f/d4ZHnYFclhYY2+YbeoE=
7373
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 h1:BEj3SPM81McUZHYjRS5pEgNgnmzGJ5tRpU5krWnV8Bs=
7474
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0/go.mod h1:9cKLGBDzI/F3NoHLQGm4ZrYdIHsvGt6ej6hUowxY0J4=
75-
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.31.0 h1:HZgBIps9wH0RDrwjrmNa3DVbNRW60HEhdzqZFyAp3fI=
76-
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.31.0/go.mod h1:RDRhvt6TDG0eIXmonAx5bd9IcwpqCkziwkOClzWKwAQ=
77-
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0 h1:UGZ1QwZWY67Z6BmckTU+9Rxn04m2bD3gD6Mk0OIOCPk=
78-
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.31.0/go.mod h1:fcwWuDuaObkkChiDlhEpSq9+X1C0omv+s5mBtToAQ64=
75+
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.32.0 h1:SZmDnHcgp3zwlPBS2JX2urGYe/jBKEIT6ZedHRUyCz8=
76+
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.32.0/go.mod h1:fdWW0HtZJ7+jNpTKUR0GpMEDP69nR8YBJQxNiVCE3jk=
77+
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0 h1:jBpDk4HAUsrnVO1FsfCfCOTEc/MkInJmvfCHYLFiT80=
78+
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.34.0/go.mod h1:H9LUIM1daaeZaz91vZcfeM0fejXPmgCYE8ZhzqfJuiU=
7979
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
8080
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
8181
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
@@ -88,36 +88,36 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU
8888
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
8989
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
9090
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
91-
goa.design/clue v1.0.7 h1:Z0qhUTvMMo2C7bxn9X7Wt4DXahGMdYuIg7pr3F+iLOs=
92-
goa.design/clue v1.0.7/go.mod h1:z9vhVyNCV02Aggr20KilzR/QQigD/wuz+0uGvWr4MYk=
93-
goa.design/goa/v3 v3.19.1 h1:jpV3LEy7YANzPMwm++Lu17RoThRJgXrPxdEM0A1nlOE=
94-
goa.design/goa/v3 v3.19.1/go.mod h1:astNE9ube0YCxqq7DQkt1MtLxB/b3kRPEFkEZovcO2I=
91+
goa.design/clue v1.1.0 h1:HAgmiLSDB++SX4Shv8bYLewRmD8MtztSLiUHFSoo0Qg=
92+
goa.design/clue v1.1.0/go.mod h1:wlamhUMR5f2EIxVK2O/Fp0QwBK9VzijEsLi0VPxRPUE=
93+
goa.design/goa/v3 v3.20.0 h1:mYYNqCBg9SSxe2jxvPJFOPmJqqKkSAUSU84jpczky3s=
94+
goa.design/goa/v3 v3.20.0/go.mod h1:g8sT4ioTaRt8BZKwZ1YOQe7UgWqkZMx+q6NWgQfzLUU=
9595
goa.design/model v1.9.8 h1:SGf+q+hYO1rh/Jvq7T0ZbfBcANzi3Lc3RHKWBDZCWCE=
9696
goa.design/model v1.9.8/go.mod h1:RqPSTbZV49gD3+IBsT9/zf+EPxt4zuDPuT/6r857H3w=
97-
golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4=
98-
golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
99-
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
100-
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
101-
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
102-
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
103-
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
104-
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
105-
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
106-
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
107-
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
108-
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
109-
golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE=
110-
golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588=
111-
google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f h1:387Y+JbxF52bmesc8kq1NyYIp33dnxCw6eiA7JMsTmw=
112-
google.golang.org/genproto v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:0joYwWwLQh18AOj8zMYeZLjzuqcYTU3/nC5JdCvC3JI=
113-
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA=
114-
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o=
115-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI=
116-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
117-
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
118-
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
119-
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
120-
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
97+
golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM=
98+
golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
99+
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
100+
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
101+
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
102+
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
103+
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
104+
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
105+
golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU=
106+
golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s=
107+
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
108+
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
109+
golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY=
110+
golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY=
111+
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb h1:ITgPrl429bc6+2ZraNSzMDk3I95nmQln2fuPstKwFDE=
112+
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:sAo5UzpjUwgFBCzupwhcLcxHVDK7vG5IqI30YnwX2eE=
113+
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb h1:p31xT4yrYrSM/G4Sn2+TNUkVhFCbG9y8itM2S6Th950=
114+
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:jbe3Bkdp+Dh2IrslsFCklNhweNTBgSYanP1UXhJDhKg=
115+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4=
116+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I=
117+
google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg=
118+
google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
119+
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
120+
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
121121
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
122122
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
123123
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

pool/node.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,6 @@ const (
9191
evDispatchReturn string = "d"
9292
)
9393

94-
const (
95-
maxRequeuingRetries = 3 // Maximum number of times to retry requeuing jobs
96-
)
97-
9894
// pendingEventTTL is the TTL for pending events.
9995
var pendingEventTTL = 2 * time.Minute
10096

@@ -506,7 +502,7 @@ func (node *Node) Shutdown(ctx context.Context) error {
506502
}
507503

508504
// Signal all nodes to shutdown.
509-
if _, err := node.nodeShutdownMap.SetAndWait(ctx, "shutdown", node.ID); err != nil {
505+
if _, err := node.nodeShutdownMap.Set(ctx, "shutdown", node.ID); err != nil {
510506
node.logger.Error(fmt.Errorf("Shutdown: failed to set shutdown status in shutdown map: %w", err))
511507
}
512508
<-node.closed // Wait for this node to be closed
@@ -603,7 +599,7 @@ func (node *Node) stopAllJobs(ctx context.Context) {
603599
pulse.Go(node.logger, func() {
604600
defer wg.Done()
605601
for _, job := range worker.Jobs() {
606-
if err := worker.stopJob(ctx, job.Key, false); err != nil {
602+
if err := worker.stopJob(ctx, job.Key); err != nil {
607603
node.logger.Error(fmt.Errorf("Close: failed to stop job %q for worker %q: %w", job.Key, worker.ID, err))
608604
}
609605
total.Add(1)
@@ -990,6 +986,7 @@ func (node *Node) cleanupWorker(ctx context.Context, workerID string) {
990986
payload, ok := node.JobPayload(key)
991987
if !ok {
992988
node.logger.Error(fmt.Errorf("requeueWorkerJobs: failed to get job payload"), "job", key, "worker", workerID)
989+
requeued++ // We will never be able to requeue this job
993990
continue
994991
}
995992
job := &Job{Key: key, Payload: []byte(payload), CreatedAt: time.Now(), NodeID: node.ID}
@@ -1017,12 +1014,21 @@ func (node *Node) processInactiveJobs(ctx context.Context) {
10171014
ticker := time.NewTicker(node.ackGracePeriod) // Run at ackGracePeriod frequency since pending jobs expire after 2*ackGracePeriod
10181015
defer ticker.Stop()
10191016

1017+
payloadCleanupTicker, err := node.NewTicker(ctx, "jobPayloadCleanup", node.workerTTL)
1018+
if err != nil {
1019+
node.logger.Error(fmt.Errorf("processInactiveJobs: failed to create payload cleanup ticker: %w", err))
1020+
return
1021+
}
1022+
defer payloadCleanupTicker.Stop()
1023+
10201024
for {
10211025
select {
10221026
case <-node.stop:
10231027
return
10241028
case <-ticker.C:
10251029
node.cleanupStalePendingJobs(ctx)
1030+
case <-payloadCleanupTicker.C:
1031+
node.cleanupOrphanedJobPayloads(ctx)
10261032
}
10271033
}
10281034
}
@@ -1044,6 +1050,29 @@ func (node *Node) cleanupStalePendingJobs(ctx context.Context) {
10441050
}
10451051
}
10461052

1053+
// cleanupOrphanedJobPayloads checks for and removes entries in the job payload map
1054+
// that don't have a corresponding entry in the job map.
1055+
func (node *Node) cleanupOrphanedJobPayloads(ctx context.Context) {
1056+
// Get all existing job keys from the job map
1057+
existingJobs := make(map[string]struct{})
1058+
for _, jobs := range node.jobMap.Map() {
1059+
for _, key := range strings.Split(jobs, ",") {
1060+
existingJobs[key] = struct{}{}
1061+
}
1062+
}
1063+
1064+
// Check each payload entry
1065+
for key := range node.jobPayloadMap.Map() {
1066+
if _, exists := existingJobs[key]; !exists {
1067+
if _, err := node.jobPayloadMap.Delete(ctx, key); err != nil {
1068+
node.logger.Error(fmt.Errorf("cleanupOrphanedJobPayloads: failed to delete orphaned payload for job %q: %w", key, err))
1069+
continue
1070+
}
1071+
node.logger.Info("cleanupOrphanedJobPayloads: removed orphaned payload", "key", key)
1072+
}
1073+
}
1074+
}
1075+
10471076
// acquireCleanupLock tries to acquire the cleanup lock for a worker.
10481077
// It returns true if the lock was acquired, false if another node holds the lock.
10491078
// It will clear any stale or invalid locks it finds.
@@ -1197,6 +1226,12 @@ func (node *Node) removeWorkerFromMaps(ctx context.Context, id string) {
11971226
if _, err := node.workerCleanupMap.Delete(ctx, id); err != nil {
11981227
node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove cleanup timestamp: %w", err), "worker", id)
11991228
}
1229+
jobKeys, _ := node.jobMap.GetValues(id)
1230+
for _, key := range jobKeys {
1231+
if _, err := node.jobPayloadMap.Delete(ctx, key); err != nil {
1232+
node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove job %s from payload map: %w", key, err))
1233+
}
1234+
}
12001235
if _, err := node.jobMap.Delete(ctx, id); err != nil {
12011236
node.logger.Error(fmt.Errorf("removeWorkerFromMaps: failed to remove worker %s from jobs map: %w", id, err))
12021237
}

0 commit comments

Comments
 (0)