Skip to content

Commit 9bb809c

Browse files
Copilotvcastellm
andauthored
Add API endpoints to pause/unpause new job submissions (#1872)
* Initial plan * Add pause/unpause functionality for new job submissions Co-authored-by: vcastellm <47026+vcastellm@users.noreply.github.com> * Update OpenAPI spec with pause/unpause endpoints Co-authored-by: vcastellm <47026+vcastellm@users.noreply.github.com> * Fix test port conflict - use port 8097 instead of 8096 Co-authored-by: vcastellm <47026+vcastellm@users.noreply.github.com> * Fix test port collision - use port 8102 instead of 8097 Co-authored-by: vcastellm <47026+vcastellm@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: vcastellm <47026+vcastellm@users.noreply.github.com>
1 parent 9afaef7 commit 9bb809c

File tree

4 files changed

+176
-0
lines changed

4 files changed

+176
-0
lines changed

dkron/agent.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ type Agent struct {
128128
logger *logrus.Entry
129129

130130
tracer trace.Tracer
131+
132+
// pauseNewJobs controls whether new jobs can be created or updated
133+
pauseNewJobs bool
134+
pauseMu sync.RWMutex
131135
}
132136

133137
// ProcessorFactory is a function type that creates a new instance
@@ -965,3 +969,26 @@ func (a *Agent) startReporter() {
965969
}
966970
}()
967971
}
972+
973+
// PauseNewJobs pauses new job submissions
974+
func (a *Agent) PauseNewJobs() {
975+
a.pauseMu.Lock()
976+
defer a.pauseMu.Unlock()
977+
a.pauseNewJobs = true
978+
a.logger.Info("agent: New job submissions paused")
979+
}
980+
981+
// UnpauseNewJobs resumes new job submissions
982+
func (a *Agent) UnpauseNewJobs() {
983+
a.pauseMu.Lock()
984+
defer a.pauseMu.Unlock()
985+
a.pauseNewJobs = false
986+
a.logger.Info("agent: New job submissions resumed")
987+
}
988+
989+
// IsNewJobsPaused returns whether new job submissions are paused
990+
func (a *Agent) IsNewJobsPaused() bool {
991+
a.pauseMu.RLock()
992+
defer a.pauseMu.RUnlock()
993+
return a.pauseNewJobs
994+
}

dkron/api.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ func (h *HTTPTransport) APIRoutes(r *gin.RouterGroup, middleware ...gin.HandlerF
109109

110110
v1.GET("/busy", h.busyHandler)
111111

112+
v1.GET("/pause", h.pauseStatusHandler)
113+
v1.POST("/pause", h.pauseHandler)
114+
v1.POST("/unpause", h.unpauseHandler)
115+
112116
v1.POST("/jobs", h.jobCreateOrUpdateHandler)
113117
v1.PATCH("/jobs", h.jobCreateOrUpdateHandler)
114118
// Place fallback routes last
@@ -217,6 +221,13 @@ func (h *HTTPTransport) jobGetHandler(c *gin.Context) {
217221
}
218222

219223
func (h *HTTPTransport) jobCreateOrUpdateHandler(c *gin.Context) {
224+
// Check if new job submissions are paused
225+
if h.agent.IsNewJobsPaused() {
226+
c.AbortWithStatus(http.StatusServiceUnavailable)
227+
_, _ = c.Writer.WriteString("New job submissions are currently paused")
228+
return
229+
}
230+
220231
// Init the Job object with defaults
221232
job := Job{
222233
Concurrency: ConcurrencyAllow,
@@ -508,3 +519,18 @@ func (h *HTTPTransport) busyHandler(c *gin.Context) {
508519
c.Header("X-Total-Count", strconv.Itoa(len(executions)))
509520
renderJSON(c, http.StatusOK, executions)
510521
}
522+
523+
func (h *HTTPTransport) pauseHandler(c *gin.Context) {
524+
h.agent.PauseNewJobs()
525+
renderJSON(c, http.StatusOK, gin.H{"paused": true})
526+
}
527+
528+
func (h *HTTPTransport) unpauseHandler(c *gin.Context) {
529+
h.agent.UnpauseNewJobs()
530+
renderJSON(c, http.StatusOK, gin.H{"paused": false})
531+
}
532+
533+
func (h *HTTPTransport) pauseStatusHandler(c *gin.Context) {
534+
paused := h.agent.IsNewJobsPaused()
535+
renderJSON(c, http.StatusOK, gin.H{"paused": paused})
536+
}

dkron/api_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,3 +540,71 @@ func TestAPILeaderEndpointsNoRaftNoPanic(t *testing.T) {
540540
"leader endpoint should return valid status code")
541541
}
542542
}
543+
544+
func TestAPIPauseUnpause(t *testing.T) {
545+
port := "8102"
546+
baseURL := fmt.Sprintf("http://localhost:%s/v1", port)
547+
dir, a := setupAPITest(t, port)
548+
defer os.RemoveAll(dir)
549+
defer a.Stop() // nolint: errcheck
550+
551+
// Check initial pause status (should be false)
552+
resp, err := http.Get(baseURL + "/pause")
553+
require.NoError(t, err)
554+
body, _ := ioutil.ReadAll(resp.Body)
555+
resp.Body.Close()
556+
assert.Equal(t, http.StatusOK, resp.StatusCode)
557+
assert.Contains(t, string(body), `"paused":false`)
558+
559+
// Pause new job submissions
560+
resp, err = http.Post(baseURL+"/pause", "application/json", nil)
561+
require.NoError(t, err)
562+
body, _ = ioutil.ReadAll(resp.Body)
563+
resp.Body.Close()
564+
assert.Equal(t, http.StatusOK, resp.StatusCode)
565+
assert.Contains(t, string(body), `"paused":true`)
566+
567+
// Verify pause status
568+
resp, err = http.Get(baseURL + "/pause")
569+
require.NoError(t, err)
570+
body, _ = ioutil.ReadAll(resp.Body)
571+
resp.Body.Close()
572+
assert.Equal(t, http.StatusOK, resp.StatusCode)
573+
assert.Contains(t, string(body), `"paused":true`)
574+
575+
// Try to create a job while paused (should fail)
576+
jsonStr := []byte(`{
577+
"name": "test_job_paused",
578+
"schedule": "@every 1m",
579+
"executor": "shell",
580+
"executor_config": {"command": "date"}
581+
}`)
582+
resp, err = http.Post(baseURL+"/jobs", "application/json", bytes.NewBuffer(jsonStr))
583+
require.NoError(t, err)
584+
body, _ = ioutil.ReadAll(resp.Body)
585+
resp.Body.Close()
586+
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
587+
assert.Contains(t, string(body), "paused")
588+
589+
// Unpause new job submissions
590+
resp, err = http.Post(baseURL+"/unpause", "application/json", nil)
591+
require.NoError(t, err)
592+
body, _ = ioutil.ReadAll(resp.Body)
593+
resp.Body.Close()
594+
assert.Equal(t, http.StatusOK, resp.StatusCode)
595+
assert.Contains(t, string(body), `"paused":false`)
596+
597+
// Verify unpause status
598+
resp, err = http.Get(baseURL + "/pause")
599+
require.NoError(t, err)
600+
body, _ = ioutil.ReadAll(resp.Body)
601+
resp.Body.Close()
602+
assert.Equal(t, http.StatusOK, resp.StatusCode)
603+
assert.Contains(t, string(body), `"paused":false`)
604+
605+
// Try to create a job after unpause (should succeed)
606+
resp, err = http.Post(baseURL+"/jobs", "application/json", bytes.NewBuffer(jsonStr))
607+
require.NoError(t, err)
608+
resp.Body.Close()
609+
assert.Equal(t, http.StatusCreated, resp.StatusCode)
610+
}

website/static/openapi/openapi.yaml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,61 @@ paths:
434434
items:
435435
$ref: '#/components/schemas/execution'
436436

437+
/pause:
438+
get:
439+
tags:
440+
- default
441+
description: |
442+
Returns the current pause status for new job submissions.
443+
operationId: pauseStatus
444+
responses:
445+
"200":
446+
description: Successful response
447+
content:
448+
application/json:
449+
schema:
450+
type: object
451+
properties:
452+
paused:
453+
type: boolean
454+
description: Whether new job submissions are currently paused
455+
post:
456+
tags:
457+
- default
458+
description: |
459+
Pauses new job submissions. Existing jobs continue to run, but no new jobs can be created or updated.
460+
operationId: pause
461+
responses:
462+
"200":
463+
description: Successful response
464+
content:
465+
application/json:
466+
schema:
467+
type: object
468+
properties:
469+
paused:
470+
type: boolean
471+
description: Whether new job submissions are currently paused
472+
473+
/unpause:
474+
post:
475+
tags:
476+
- default
477+
description: |
478+
Resumes new job submissions after being paused.
479+
operationId: unpause
480+
responses:
481+
"200":
482+
description: Successful response
483+
content:
484+
application/json:
485+
schema:
486+
type: object
487+
properties:
488+
paused:
489+
type: boolean
490+
description: Whether new job submissions are currently paused
491+
437492
/acl/policies/{name}:
438493
get:
439494
tags:

0 commit comments

Comments
 (0)