-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcheckjob.go
More file actions
134 lines (121 loc) · 3.39 KB
/
Copy pathcheckjob.go
File metadata and controls
134 lines (121 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"context"
"fmt"
"time"
"github.com/jorben/rsync-object-storage/config"
"github.com/jorben/rsync-object-storage/helper"
"github.com/jorben/rsync-object-storage/log"
"io/fs"
"path/filepath"
)
// CheckJob 定时对账任务
type CheckJob struct {
InitialDelay time.Duration
Interval int
Enable bool
PutChan chan string
LocalPrefix string
Ignore []string
Storage *Storage
}
// NewCheckJob 创建Job实例
func NewCheckJob(c *config.SyncConfig, ch chan string, storage *Storage) *CheckJob {
// 计算首次执行时间
now := time.Now()
targetTime, err := time.ParseInLocation("2006-01-02 15:04:05",
fmt.Sprintf("%d-%02d-%02d %s", now.Year(), now.Month(), now.Day(), c.Sync.CheckJob.StartAt), now.Location())
if err != nil {
// 格式不正确,设置为从0点开始
log.Errorf("Parse StartAt err: %s, Reset start at 0:0:0", err.Error())
targetTime = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
}
// 最高频率1小时一次
if c.Sync.CheckJob.Interval < 1 {
c.Sync.CheckJob.Interval = 1
}
// 如果时间已过去则+24小时看下一个启动时点
if now.After(targetTime) {
targetTime = targetTime.Add(24 * time.Hour)
}
return &CheckJob{
InitialDelay: targetTime.Sub(now),
Interval: c.Sync.CheckJob.Interval,
Enable: c.Sync.CheckJob.Enable,
Storage: storage,
LocalPrefix: c.Local.Path,
PutChan: ch,
Ignore: c.Sync.Ignore,
}
}
// Run Check job 启动入口
// 支持通过context取消实现优雅退出
func (c *CheckJob) Run(ctx context.Context) {
if !c.Enable {
log.Debug("The check job is disabled")
// 即使禁用也需要响应context取消
<-ctx.Done()
return
}
log.Debugf("The check job will start at %s", time.Now().Add(c.InitialDelay).Format("2006-01-02 15:04:05"))
// 使用select等待初始延迟或context取消
select {
case <-ctx.Done():
log.Debug("CheckJob received shutdown signal before first run, exiting...")
return
case <-time.After(c.InitialDelay):
// 执行首次校对任务
c.Walk(ctx)
}
// 创建周期定时器
ticker := time.NewTicker(time.Duration(c.Interval) * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Debug("CheckJob received shutdown signal, exiting...")
return
case <-ticker.C:
// 执行周期校对任务
c.Walk(ctx)
}
}
}
// Walk 遍历本地文件,对比与远端差异,存在差异的丢入变更队列
func (c *CheckJob) Walk(ctx context.Context) {
log.Info("Check job begin")
err := filepath.WalkDir(c.LocalPrefix, func(path string, d fs.DirEntry, err error) error {
// 检查是否需要退出
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err != nil {
log.Errorf("WalkDir err: %s, skipping %s", err.Error(), path)
return filepath.SkipDir
}
// 在忽略名单的文件夹直接跳过,不进入
if d.IsDir() && helper.IsIgnore(path, c.Ignore) {
return filepath.SkipDir
}
// 对比文件
if !helper.IsIgnore(path, c.Ignore) {
if isSame := c.Storage.IsSameV2(ctx, path, ""); !isSame {
// 文件存在差异,丢入变更队列
select {
case c.PutChan <- path:
log.Infof("Differences found %s", path)
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
if err != nil && err != context.Canceled {
log.Errorf("WalkDir err: %s", err.Error())
return
}
log.Info("Check job ends")
}