Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/usage/docker-labels.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ You can configure your service using swarm-cronjob through Docker labels:
| Name | Default | Description |
|--------------------------------|---------|--------------------------------------------------------------------------------------------------------------------|
| `swarm.cronjob.enable` | | Set to true to enable the cronjob. **required** |
| `swarm.cronjob.schedule` | | [CRON expression format](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) to use. **required** |
| `swarm.cronjob.schedule` | | [CRON expression format](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) to use OR [Start Time Format in seconds](https://pkg.go.dev/github.com/go-co-op/gocron/v2#OneTimeJob). **required** |
| `swarm.cronjob.skip-running` | `false` | Do not start a job if the service is currently running. |
| `swarm.cronjob.replicas` | `1` | Number of replicas to set on schedule in `replicated` mode. |
| `swarm.cronjob.registry-auth` | `false` | Send registry authentication details to Swarm agents. |
| `swarm.cronjob.query-registry` | | Indicates whether the service update requires contacting a registry |
| `swarm.cronjob.run-once` | | Indicates whether the service will be scheduled to run only once |
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ require (
github.com/distribution/reference v0.6.0
github.com/docker/cli v27.4.1+incompatible
github.com/docker/docker v27.4.1+incompatible
github.com/go-co-op/gocron/v2 v2.16.0
github.com/google/uuid v1.6.0
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1
github.com/rs/zerolog v1.33.0
golang.org/x/sys v0.28.0
)
Expand All @@ -32,10 +33,10 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.5.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
Expand All @@ -50,6 +51,7 @@ require (
github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5 // indirect
github.com/prometheus/common v0.0.0-20180110214958-89604d197083 // indirect
github.com/prometheus/procfs v0.0.0-20180125133057-cb4147076ac7 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fvbommel/sortorder v1.1.0 h1:fUmoe+HLsBTctBDoaBwpQo5N+nrCp8g/BjKb/6ZQmYw=
github.com/fvbommel/sortorder v1.1.0/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/go-co-op/gocron/v2 v2.16.0 h1:uqUF6WFZ4enRU45pWFNcn1xpDLc+jBOTKhPQI16Z1xs=
github.com/go-co-op/gocron/v2 v2.16.0/go.mod h1:opexeOFy5BplhsKdA7bzY9zeYih8I8/WNJ4arTIFPVc=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down Expand Up @@ -106,6 +108,8 @@ github.com/jinzhu/gorm v0.0.0-20170222002820-5409931a1bb8/go.mod h1:Vla75njaFJ8c
github.com/jinzhu/inflection v0.0.0-20170102125226-1c35d901db3d h1:jRQLvyVGL+iVtDElaEIDdKwpPqUIZJfzkNLV34htpEc=
github.com/jinzhu/inflection v0.0.0-20170102125226-1c35d901db3d/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I=
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down
135 changes: 95 additions & 40 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,41 @@ import (

"github.com/crazy-max/swarm-cronjob/internal/docker"
"github.com/crazy-max/swarm-cronjob/internal/model"
"github.com/crazy-max/swarm-cronjob/internal/scheduler"
"github.com/crazy-max/swarm-cronjob/internal/worker"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/mitchellh/mapstructure"
cron "github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"
)

// SwarmCronjob represents an active swarm-cronjob object
type SwarmCronjob struct {
docker docker.Client
cron *cron.Cron
jobs map[string]cron.EntryID
}
type (
Jobs map[string]scheduler.Uid

// New creates new swarm-cronjob instance
// SwarmCronjob represents an active swarm-cronjob object
SwarmCronjob struct {
docker docker.Client
scheduler *scheduler.Scheduler
jobs Jobs
runOnce map[string]bool
}
)

// New setup new swarm-cronjob instance
func New() (*SwarmCronjob, error) {
log.Debug().Msg("Creating Docker API client")
d, err := docker.NewEnvClient()

scheduler, err := scheduler.NewScheduler(true)
if err != nil {
return nil, err
}

return &SwarmCronjob{
docker: d,
cron: cron.New(cron.WithParser(cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor),
)),
jobs: make(map[string]cron.EntryID),
docker: d,
scheduler: scheduler,
jobs: make(Jobs),
runOnce: make(map[string]bool),
}, err
}

Expand All @@ -51,14 +60,14 @@ func (sc *SwarmCronjob) Run() error {

// Add services as cronjobs
for _, service := range services {
if _, err := sc.crudJob(service.Name); err != nil {
if _, err := sc.crudJob(service.Name, false); err != nil {
log.Error().Err(err).Msgf("Cannot manage job for service %s", service.Name)
}
}

// Start cron routine
log.Debug().Msg("Starting the cron scheduler")
sc.cron.Start()
sc.scheduler.Start()

// Listen Docker events
log.Debug().Msg("Listening docker events...")
Expand All @@ -69,35 +78,43 @@ func (sc *SwarmCronjob) Run() error {
Filters: filter,
})

var event model.ServiceEvent
var (
event model.ServiceEvent
deploy bool
)
for {
select {
case err := <-errs:
log.Fatal().Err(err).Msg("Event channel failed")
case msg := <-msgs:
err := mapstructure.Decode(msg.Actor.Attributes, &event)
if err != nil {
if err := mapstructure.Decode(msg.Actor.Attributes, &event); err != nil {
log.Warn().Msgf("Cannot decode event, %v", err)
continue
}

log.Debug().
Str("service", event.Service).
Str("newstate", event.UpdateState.New).
Str("oldstate", event.UpdateState.Old).
Msg("Event triggered")
processed, err := sc.crudJob(event.Service)

if event.UpdateState.New == "" && event.UpdateState.Old == "" {
deploy = true
}
processed, err := sc.crudJob(event.Service, deploy)
if err != nil {
log.Error().Str("service", event.Service).Err(err).Msg("Cannot manage job")
continue

} else if processed {
log.Debug().Msgf("Number of cronjob tasks: %d", len(sc.cron.Entries()))
log.Debug().Msgf("Number of cronjob tasks: %d", sc.scheduler.CountJobs())
}
}
}
}

// crudJob adds, updates or removes cron job service
func (sc *SwarmCronjob) crudJob(serviceName string) (bool, error) {
func (sc *SwarmCronjob) crudJob(serviceName string, deploy bool) (bool, error) {
// Find existing job
jobID, jobFound := sc.jobs[serviceName]

Expand Down Expand Up @@ -128,44 +145,54 @@ func (sc *SwarmCronjob) crudJob(serviceName string) (bool, error) {
for labelKey, labelValue := range service.Labels {
switch labelKey {
case "swarm.cronjob.enable":
wc.Job.Enable, err = strconv.ParseBool(labelValue)
if err != nil {
if wc.Job.Enable, err = strconv.ParseBool(labelValue); err != nil {
log.Error().Str("service", service.Name).Err(err).Msgf("Cannot parse %s value of label %s", labelValue, labelKey)
}

case "swarm.cronjob.schedule":
wc.Job.Schedule = labelValue

case "swarm.cronjob.skip-running":
wc.Job.SkipRunning, err = strconv.ParseBool(labelValue)
if err != nil {
if wc.Job.SkipRunning, err = strconv.ParseBool(labelValue); err != nil {
log.Error().Str("service", service.Name).Err(err).Msgf("Cannot parse %s value of label %s", labelValue, labelKey)
}

case "swarm.cronjob.replicas":
wc.Job.Replicas, err = strconv.ParseUint(labelValue, 10, 64)
if err != nil {
if wc.Job.Replicas, err = strconv.ParseUint(labelValue, 10, 64); err != nil {
log.Error().Str("service", service.Name).Err(err).Msgf("Cannot parse %s value of label %s", labelValue, labelKey)
} else if wc.Job.Replicas < 1 {
log.Error().Str("service", service.Name).Msgf("%s must be greater than or equal to one", labelKey)
}

case "swarm.cronjob.registry-auth":
wc.Job.RegistryAuth, err = strconv.ParseBool(labelValue)
if err != nil {
if wc.Job.RegistryAuth, err = strconv.ParseBool(labelValue); err != nil {
log.Error().Str("service", service.Name).Err(err).Msgf("Cannot parse %s value of label %s", labelValue, labelKey)
}

case "swarm.cronjob.query-registry":
queryRegistry, err := strconv.ParseBool(labelValue)
if err != nil {
log.Error().Str("service", service.Name).Err(err).Msgf("Cannot parse %s value of label %s", labelValue, labelKey)
}
wc.Job.QueryRegistry = &queryRegistry

case "swarm.cronjob.scaledown":
if labelValue == "true" {
log.Debug().Str("service", service.Name).Msg("Scale down detected. Skipping cronjob")
return false, nil
}

case "swarm.cronjob.run-once":
if labelValue == "true" {
sc.runOnce[service.Name] = true
log.Debug().Str("service", service.Name).Msgf("Enabled run once for the job %s", service.Name)
} else {
sc.runOnce[service.Name] = false
}
}
}

// Disabled or non-cron service
// Check if is disabled or is a non-cron service
if !wc.Job.Enable {
if jobFound {
log.Info().Str("service", service.Name).Msg("Disable cronjob")
Expand All @@ -178,29 +205,57 @@ func (sc *SwarmCronjob) crudJob(serviceName string) (bool, error) {

// Add/Update job
if jobFound {
// check if is to run job only once
if sc.runOnce[service.Name] {
// check if the defined service got an update
if deploy {
sc.removeJob(serviceName, jobID)
}
log.Info().Str("service", service.Name).Msgf("Job %s only scheduled to run once, skipping", wc.Job.Name)
return true, err
}

sc.removeJob(serviceName, jobID)
log.Debug().Str("service", service.Name).Msgf("Update cronjob with schedule %s", wc.Job.Schedule)
} else {
log.Info().Str("service", service.Name).Msgf("Add cronjob with schedule %s", wc.Job.Schedule)
if sc.runOnce[service.Name] {
log.Info().Str("service", service.Name).Msgf("Add one time job to be run after %s seconds", wc.Job.Schedule)
} else {
log.Info().Str("service", service.Name).Msgf("Add cronjob with schedule %s", wc.Job.Schedule)
}
}

jobID, err = sc.cron.AddJob(wc.Job.Schedule, wc)
if err != nil {
return false, err
var job scheduler.Job
// check if current service is configured to run only once
if sc.runOnce[service.Name] {
if job, err = sc.scheduler.OneTimeJob(wc.Job.Schedule, func() {
defer wc.Run()
log.Debug().Str("service", service.Name).Msgf("Triggered one time job %s ...", wc.Job.Name)
}); err != nil {
return false, err
}
} else {

// by default set service as a Cron Job
if job, err = sc.scheduler.CronJob(wc.Job.Schedule, func() {
wc.Run()
}); err != nil {
return false, err
}
}

sc.jobs[serviceName] = jobID
sc.jobs[serviceName] = job.ID()
return true, err
}

// Close closes swarm-cronjob
// Close stops swarm-cronjob jobs to be processed
func (sc *SwarmCronjob) Close() {
if sc.cron != nil {
sc.cron.Stop()
if sc.scheduler != nil {
sc.scheduler.Stop()
}
}

func (sc *SwarmCronjob) removeJob(serviceName string, id cron.EntryID) {
func (sc *SwarmCronjob) removeJob(serviceName string, id scheduler.Uid) {
defer sc.scheduler.RemoveJob(id)
delete(sc.jobs, serviceName)
sc.cron.Remove(id)
}
Loading