Skip to content

Commit 1240875

Browse files
committed
feat: add cronjob ssh command
1 parent 7ea04c0 commit 1240875

8 files changed

Lines changed: 243 additions & 22 deletions

File tree

cmd/soft/serve/server.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,16 @@ func NewServer(ctx context.Context) (*Server, error) {
5959
// Add cron jobs.
6060
sched := cron.NewScheduler(ctx)
6161
for n, j := range jobs.List() {
62-
spec := j.Runner.Spec(ctx)
62+
jobCfg, err := j.Runner.Config(ctx)
63+
if err != nil {
64+
return nil, fmt.Errorf("parse cronjob [%s] config: %w", err)
65+
}
66+
spec := jobCfg.Spec()
6367
if spec == "" {
6468
continue
6569
}
66-
id, err := sched.AddFunc(spec, j.Runner.Func(ctx))
70+
71+
id, err := sched.AddFunc(spec, j.Runner.Func(ctx, jobCfg))
6772
if err != nil {
6873
logger.Warn("error adding cron job", "job", n, "err", err)
6974
}

pkg/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ func (c *Config) Environ() []string {
221221
fmt.Sprintf("SOFT_SERVE_LFS_ENABLED=%t", c.LFS.Enabled),
222222
fmt.Sprintf("SOFT_SERVE_LFS_SSH_ENABLED=%t", c.LFS.SSHEnabled),
223223
fmt.Sprintf("SOFT_SERVE_JOBS_MIRROR_PULL=%s", c.Jobs.MirrorPull),
224+
fmt.Sprintf("SOFT_SERVE_JOBS_GIT_GC=%s", c.Jobs.GitGC),
224225
}...)
225226

226227
return envs

pkg/config/file.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ lfs:
143143
# Cron job configuration
144144
jobs:
145145
mirror_pull: "{{ .Jobs.MirrorPull }}"
146+
git_gc: "{{ .Jobs.GitGC }}"
146147
147148
# Additional admin keys.
148149
#initial_admin_keys:

pkg/jobs/garbage_collector.go

Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,64 @@
11
package jobs
22

33
import (
4+
"bytes"
45
"context"
6+
"fmt"
57
"runtime"
68

79
"charm.land/log/v2"
810
"github.com/charmbracelet/soft-serve/git"
911
"github.com/charmbracelet/soft-serve/pkg/backend"
1012
"github.com/charmbracelet/soft-serve/pkg/config"
1113
"github.com/charmbracelet/soft-serve/pkg/sync"
14+
"github.com/spf13/pflag"
1215
)
1316

1417
func init() {
1518
Register("git-gc", gitGC{})
1619
}
1720

18-
type gitGC struct{}
21+
type (
22+
gitGC struct{}
23+
gitGCConfig struct {
24+
baseRunnerConfig
1925

20-
func (g gitGC) Spec(ctx context.Context) string {
21-
cfg := config.FromContext(ctx)
22-
if cfg.Jobs.GitGC != "" {
23-
return cfg.Jobs.GitGC
26+
RepoConfig map[string]string
27+
Aggressive bool
2428
}
25-
return ""
29+
)
30+
31+
// Description return the description of garbage collector job task and implements Runner.
32+
func (m gitGC) Description() string {
33+
return "clean up the garbage in repositories"
2634
}
2735

28-
func (g gitGC) Func(ctx context.Context) func() {
36+
// Config returns the garbage collector job task configuration and implements Runner.
37+
func (m gitGC) Config(ctx context.Context) (RunnerConfig, error) {
38+
cfg := gitGCConfig{
39+
baseRunnerConfig: baseRunnerConfig{CronSpec: ""},
40+
Aggressive: false,
41+
RepoConfig: make(map[string]string),
42+
}
43+
44+
if spec := config.FromContext(ctx).Jobs.GitGC; spec != "" {
45+
cfg.CronSpec = spec
46+
}
47+
48+
return &cfg, nil
49+
}
50+
51+
// Func runs the garbage collector job task and implements Runner.
52+
func (g gitGC) Func(ctx context.Context, cronCfg RunnerConfig) func() {
2953
b := backend.FromContext(ctx)
3054
logger := log.FromContext(ctx).WithPrefix("jobs.gitgc")
55+
jobcfg := cronCfg.(*gitGCConfig)
56+
3157
return func() {
3258
repos, err := b.Repositories(ctx)
3359
if err != nil {
3460
logger.Error("error getting repositories", "err", err)
61+
fmt.Fprintf(jobcfg.Error(), "error getting repositories: %v\n", err)
3562
return
3663
}
3764

@@ -44,20 +71,56 @@ func (g gitGC) Func(ctx context.Context) func() {
4471
r, err := repo.Open()
4572
if err != nil {
4673
logger.Error("error opening repository", "repo", repo.Name(), "err", err)
74+
fmt.Fprintf(jobcfg.Error(), "[%s] error opening repository: %v\n", repo.Name(), err)
4775
continue
4876
}
4977

5078
name := repo.Name()
5179
wq.Add(name, func() {
52-
cmd := git.NewCommand("gc").WithContext(ctx)
80+
// buffer and write to stdout/stderr in one go,
81+
// avoiding output confusion through parallel writing.
82+
var (
83+
stdout = bytes.NewBuffer(nil)
84+
stderr = bytes.NewBuffer(nil)
85+
)
86+
defer func() {
87+
jobcfg.Output().Write(stdout.Bytes())
88+
jobcfg.Error().Write(stderr.Bytes())
89+
}()
90+
91+
var cmdArgs []string = nil
92+
for key, val := range jobcfg.RepoConfig {
93+
cmdArgs = append(cmdArgs, "-c", key+"="+val)
94+
}
95+
96+
cmdArgs = append(cmdArgs, "gc")
97+
98+
if jobcfg.Aggressive {
99+
cmdArgs = append(cmdArgs, "--aggressive")
100+
}
101+
102+
// `git gc` would not output anything if no tty
103+
cmd := git.NewCommand(cmdArgs...).WithContext(ctx)
53104
if _, err := cmd.RunInDir(r.Path); err != nil {
54105
logger.Error("error running git remote update", "repo", name, "err", err)
106+
fmt.Fprintf(stderr, "[%s] git gc failed: %v\n", name, err)
107+
} else {
108+
fmt.Fprintf(stdout, "[%s] git gc succeed\n", name)
55109
}
56-
57110
})
58111

59112
// TODO: clean up lfs
60113
}
61-
114+
wq.Run()
62115
}
63116
}
117+
118+
// FlagSet returns the flag set that can modify configuration values and implements RunnerConfig
119+
func (cfg *gitGCConfig) FlagSet() *pflag.FlagSet {
120+
flags := pflag.NewFlagSet("git-gc", pflag.ContinueOnError)
121+
flags.StringToStringVarP(&cfg.RepoConfig, "config", "c", cfg.RepoConfig, "Override values from git repository configuration files")
122+
123+
flags.BoolVar(&cfg.Aggressive, "aggressive", cfg.Aggressive, "Optimize the repository more aggressively, see git-gc(1) for more details")
124+
125+
return flags
126+
}

pkg/jobs/jobs.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package jobs
22

33
import (
44
"context"
5+
"io"
56
"sync"
7+
8+
"github.com/spf13/pflag"
69
)
710

811
// Job is a job that can be registered with the scheduler.
@@ -13,8 +16,56 @@ type Job struct {
1316

1417
// Runner is a job runner.
1518
type Runner interface {
16-
Spec(context.Context) string
17-
Func(context.Context) func()
19+
Description() string
20+
Config(context.Context) (RunnerConfig, error)
21+
22+
Func(context.Context, RunnerConfig) func()
23+
}
24+
25+
// JobConfig is the configuration for Runner, passed to Runner.Func
26+
type RunnerConfig interface {
27+
FlagSet() *pflag.FlagSet
28+
Spec() string
29+
30+
SetOut(out io.Writer)
31+
Output() io.Writer
32+
SetErr(err io.Writer)
33+
Error() io.Writer
34+
}
35+
36+
// baseRunnerConfig implements the common part of job tasks' RunnerConfig
37+
type baseRunnerConfig struct {
38+
CronSpec string `yaml:"spec"`
39+
40+
output io.Writer
41+
error io.Writer
42+
}
43+
44+
// SetOut sets the stdout of cron job
45+
func (cfg *baseRunnerConfig) SetOut(out io.Writer) { cfg.output = out }
46+
47+
// Output return the stdout of cron job
48+
func (cfg *baseRunnerConfig) Output() io.Writer {
49+
if cfg.output == nil {
50+
return io.Discard
51+
}
52+
return cfg.output
53+
}
54+
55+
// SetErr sets the stderr of cron job
56+
func (cfg *baseRunnerConfig) SetErr(err io.Writer) { cfg.error = err }
57+
58+
// Error return the stderr of cron job
59+
func (cfg *baseRunnerConfig) Error() io.Writer {
60+
if cfg.error == nil {
61+
return io.Discard
62+
}
63+
return cfg.error
64+
}
65+
66+
// Spec derives the spec for built-in job scheduler and implements RunnerConfig.
67+
func (cfg *baseRunnerConfig) Spec() string {
68+
return cfg.CronSpec
1869
}
1970

2071
var (

pkg/jobs/mirror.go

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package jobs
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"path/filepath"
@@ -15,30 +16,47 @@ import (
1516
"github.com/charmbracelet/soft-serve/pkg/lfs"
1617
"github.com/charmbracelet/soft-serve/pkg/store"
1718
"github.com/charmbracelet/soft-serve/pkg/sync"
19+
"github.com/spf13/pflag"
1820
)
1921

2022
func init() {
2123
Register("mirror-pull", mirrorPull{})
2224
}
2325

24-
type mirrorPull struct{}
26+
type (
27+
mirrorPull struct{}
28+
mirrorPullConfig struct {
29+
baseRunnerConfig
30+
RepoConfig map[string]string
31+
}
32+
)
2533

26-
// Spec derives the spec used for pull mirrors and implements Runner.
27-
func (m mirrorPull) Spec(ctx context.Context) string {
28-
cfg := config.FromContext(ctx)
29-
if cfg.Jobs.MirrorPull != "" {
30-
return cfg.Jobs.MirrorPull
34+
// Description return the description of (pull) mirror job task and implements Runner.
35+
func (m mirrorPull) Description() string {
36+
return "fetch upstream for mirror repositories"
37+
}
38+
39+
// Config returns the (pull) mirror cronjob configuration and implements Runner.
40+
func (m mirrorPull) Config(ctx context.Context) (RunnerConfig, error) {
41+
cfg := mirrorPullConfig{
42+
baseRunnerConfig: baseRunnerConfig{CronSpec: "@every 10m"},
43+
RepoConfig: make(map[string]string),
44+
}
45+
if spec := config.FromContext(ctx).Jobs.MirrorPull; spec != "" {
46+
cfg.CronSpec = spec
3147
}
32-
return "@every 10m"
48+
49+
return &cfg, nil
3350
}
3451

3552
// Func runs the (pull) mirror job task and implements Runner.
36-
func (m mirrorPull) Func(ctx context.Context) func() {
53+
func (m mirrorPull) Func(ctx context.Context, cronCfg RunnerConfig) func() {
3754
cfg := config.FromContext(ctx)
3855
logger := log.FromContext(ctx).WithPrefix("jobs.mirror")
3956
b := backend.FromContext(ctx)
4057
dbx := db.FromContext(ctx)
4158
datastore := store.FromContext(ctx)
59+
jobcfg := cronCfg.(*mirrorPullConfig)
4260
return func() {
4361
repos, err := b.Repositories(ctx)
4462
if err != nil {
@@ -64,13 +82,31 @@ func (m mirrorPull) Func(ctx context.Context) func() {
6482
wq.Add(name, func() {
6583
repo := repo
6684

85+
// buffer and write to stdout/stderr in one go,
86+
// avoiding output confusion through parallel writing.
87+
var (
88+
stdout = bytes.NewBuffer(nil)
89+
stderr = bytes.NewBuffer(nil)
90+
)
91+
defer func() {
92+
jobcfg.Output().Write(stdout.Bytes())
93+
jobcfg.Error().Write(stderr.Bytes())
94+
}()
95+
6796
cmds := []string{
6897
"fetch --prune", // fetch prune before updating remote
6998
"remote update --prune", // update remote and prune remote refs
7099
}
71100

101+
gitFlags := []string{}
102+
for key, val := range jobcfg.RepoConfig {
103+
gitFlags = append(gitFlags, "-c", key+"="+val)
104+
}
105+
72106
for _, c := range cmds {
73107
args := strings.Split(c, " ")
108+
args = append(gitFlags, args...)
109+
74110
cmd := git.NewCommand(args...).WithContext(ctx)
75111
cmd.AddEnvs(
76112
fmt.Sprintf(`GIT_SSH_COMMAND=ssh -o UserKnownHostsFile="%s" -o StrictHostKeyChecking=no -i "%s"`,
@@ -80,6 +116,7 @@ func (m mirrorPull) Func(ctx context.Context) func() {
80116
)
81117

82118
if _, err := cmd.RunInDir(r.Path); err != nil {
119+
fmt.Fprintf(stderr, "[%s]: error running git remote update: %v\n", name, err)
83120
logger.Error("error running git remote update", "repo", name, "err", err)
84121
}
85122
}
@@ -88,6 +125,7 @@ func (m mirrorPull) Func(ctx context.Context) func() {
88125
rcfg, err := r.Config()
89126
if err != nil {
90127
logger.Error("error getting git config", "repo", name, "err", err)
128+
fmt.Fprintf(stderr, "[%s]: lfs pull: error getting git config: %v", name, err)
91129
return
92130
}
93131

@@ -101,24 +139,38 @@ func (m mirrorPull) Func(ctx context.Context) func() {
101139
ep, err := lfs.NewEndpoint(lfsEndpoint)
102140
if err != nil {
103141
logger.Error("error creating LFS endpoint", "repo", name, "err", err)
142+
fmt.Fprintf(stderr, "[%s]: lfs pull: creating LFS endpoint: %v", name, err)
104143
return
105144
}
106145

107146
client := lfs.NewClient(ep)
108147
if client == nil {
148+
fmt.Fprintf(stderr,
149+
"[%s]: lfs pull: failed to create lfs client: unsupported endpoint %s",
150+
name, lfsEndpoint)
109151
logger.Errorf("failed to create lfs client: unsupported endpoint %s", lfsEndpoint)
110152
return
111153
}
112154

113155
if err := backend.StoreRepoMissingLFSObjects(ctx, repo, dbx, datastore, client); err != nil {
156+
fmt.Fprintf(stderr, "[%s]: lfs pull: failed to store missing lfs objects: %v", name, err)
114157
logger.Error("failed to store missing lfs objects", "err", err, "path", r.Path)
115158
return
116159
}
117160
}
161+
fmt.Fprintf(stdout, "[%s]: mirror pull succeed\n", name)
118162
})
119163
}
120164
}
121165

122166
wq.Run()
123167
}
124168
}
169+
170+
// FlagSet returns the flag set that can modify configuration values and implements RunnerConfig
171+
func (cfg *mirrorPullConfig) FlagSet() *pflag.FlagSet {
172+
flags := pflag.NewFlagSet("mirror-pull", pflag.ContinueOnError)
173+
flags.StringToStringVarP(&cfg.RepoConfig, "config", "c", cfg.RepoConfig, "Override values from git repository configuration files")
174+
175+
return flags
176+
}

0 commit comments

Comments
 (0)