Skip to content

Commit 43ec712

Browse files
committed
simplify runtime startup and watcher
1 parent d1cbe78 commit 43ec712

7 files changed

Lines changed: 358 additions & 69 deletions

File tree

internal/daemon/daemon.go

Lines changed: 63 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,14 @@ type mountFailure struct {
4545

4646
type repoRuntime struct {
4747
cfg model.RepoConfig
48+
ctx context.Context
49+
cancel context.CancelFunc
4850
snapshot *snapshot.Store
4951
overlay *overlay.Store
5052
hydrator *hydrator.Service
5153
resolver *fusefs.Resolver
5254
mfs fusefs.MountedFS
5355
state model.RepoRuntimeState
54-
stop chan struct{}
5556
}
5657

5758
type aheadBehind struct {
@@ -309,53 +310,68 @@ func (s *Service) Unmount(ctx context.Context, name string) error {
309310
// start a FUSE mount or any background goroutines, so it's safe to call from
310311
// short-lived CLI commands like add-repo.
311312
func (s *Service) prepareRepo(ctx context.Context, cfg model.RepoConfig) error {
312-
if err := os.MkdirAll(cfg.MountPath, 0o755); err != nil {
313-
return err
314-
}
315-
if err := s.git.CloneBlobless(ctx, cfg); err != nil {
316-
return err
317-
}
318-
snap, err := snapshot.New(ctx, cfg.MetaDBPath)
313+
snap, _, _, _, err := s.ensurePreparedRepo(ctx, cfg)
319314
if err != nil {
320315
return err
321316
}
322317
defer snap.Close()
323-
_, err = s.publishHeadSnapshot(ctx, cfg, snap)
324-
return err
318+
return nil
325319
}
326320

327-
// mountRepo opens all stores, starts the FUSE server, watcher, and refresh
328-
// loop. Called by the daemon's Start for each registered repo.
329-
func (s *Service) mountRepo(ctx context.Context, cfg model.RepoConfig) error {
321+
// ensurePreparedRepo makes sure the repo is cloned and has an initial snapshot.
322+
// The returned snapshot store remains open for callers that need to continue
323+
// into runtime startup.
324+
func (s *Service) ensurePreparedRepo(ctx context.Context, cfg model.RepoConfig) (*snapshot.Store, string, string, int64, error) {
330325
if err := os.MkdirAll(cfg.MountPath, 0o755); err != nil {
331-
return err
326+
return nil, "", "", 0, err
332327
}
333-
// Clone if not already present (idempotent)
334328
if err := s.git.CloneBlobless(ctx, cfg); err != nil {
335-
return err
329+
return nil, "", "", 0, err
336330
}
337331
headOID, headRef, err := s.git.ResolveHEAD(ctx, cfg)
338332
if err != nil {
339-
return err
333+
return nil, "", "", 0, err
340334
}
341335
snap, err := snapshot.New(ctx, cfg.MetaDBPath)
342336
if err != nil {
343-
return err
337+
return nil, "", "", 0, err
344338
}
345-
gen, err := snap.CurrentGeneration(ctx)
346-
if err != nil || gen == 0 {
347-
var bErr error
348-
gen, _, bErr = s.publishSnapshot(ctx, cfg, snap, headOID, headRef)
349-
if bErr != nil {
339+
storedOID, storedRef, gen, err := snap.ReadState(ctx)
340+
if err != nil || gen == 0 || storedOID != headOID || storedRef != headRef {
341+
gen, _, err = s.publishSnapshot(ctx, cfg, snap, headOID, headRef)
342+
if err != nil {
350343
snap.Close()
351-
return bErr
344+
return nil, "", "", 0, err
352345
}
353346
}
347+
return snap, headOID, headRef, gen, nil
348+
}
349+
350+
// mountRepo opens all stores, starts the FUSE server, watcher, and refresh
351+
// loop. Called by the daemon's Start for each registered repo.
352+
func (s *Service) mountRepo(ctx context.Context, cfg model.RepoConfig) error {
353+
snap, headOID, headRef, gen, err := s.ensurePreparedRepo(ctx, cfg)
354+
if err != nil {
355+
return err
356+
}
354357
ov, err := overlay.New(ctx, cfg)
355358
if err != nil {
356359
snap.Close()
357360
return err
358361
}
362+
baseLookup := func(path string) (model.BaseNode, bool) {
363+
return snap.GetNode(gen, path)
364+
}
365+
if err := ov.Reconcile(ctx, baseLookup); err != nil {
366+
ov.Close()
367+
snap.Close()
368+
return err
369+
}
370+
if err := s.git.ReadTreeHEAD(ctx, cfg); err != nil {
371+
ov.Close()
372+
snap.Close()
373+
return err
374+
}
359375
h := hydrator.New(s.git)
360376

361377
resolver := &fusefs.Resolver{Snapshot: snap, Overlay: ov}
@@ -378,18 +394,20 @@ func (s *Service) mountRepo(ctx context.Context, cfg model.RepoConfig) error {
378394
s.logger.Error("fuse mount failed, running without FUSE", "repo", cfg.Name, "error", err)
379395
mfs = nil
380396
}
397+
runtimeCtx, cancel := context.WithCancel(ctx)
381398

382399
rt := &repoRuntime{
383400
cfg: cfg,
401+
ctx: runtimeCtx,
402+
cancel: cancel,
384403
snapshot: snap,
385404
overlay: ov,
386405
hydrator: h,
387406
resolver: resolver,
388407
mfs: mfs,
389408
state: newRuntimeState(cfg.ID, headOID, headRef, gen),
390-
stop: make(chan struct{}),
391409
}
392-
s.startRuntime(ctx, rt)
410+
s.startRuntime(rt)
393411

394412
return nil
395413
}
@@ -402,8 +420,18 @@ func (s *Service) onHEADChanged(ctx context.Context, rt *repoRuntime) {
402420
}
403421
s.mu.Lock()
404422
prevOID := rt.state.CurrentHEADOID
423+
prevRef := rt.state.CurrentHEADRef
405424
s.mu.Unlock()
406425
if oid == prevOID {
426+
if ref == prevRef {
427+
return
428+
}
429+
if err := rt.snapshot.UpdateHEADRef(ctx, ref); err != nil {
430+
s.logger.Warn("snapshot head_ref update failed", "repo", rt.cfg.Name, "error", err)
431+
}
432+
s.mu.Lock()
433+
rt.state.CurrentHEADRef = ref
434+
s.mu.Unlock()
407435
return
408436
}
409437
gen, phase, err := s.publishSnapshot(ctx, rt.cfg, rt.snapshot, oid, ref)
@@ -444,10 +472,10 @@ func (s *Service) refreshLoop(rt *repoRuntime) {
444472
defer ticker.Stop()
445473
for {
446474
select {
447-
case <-rt.stop:
475+
case <-rt.ctx.Done():
448476
return
449477
case <-ticker.C:
450-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
478+
ctx, cancel := context.WithTimeout(rt.ctx, 30*time.Second)
451479
err := s.git.Fetch(ctx, rt.cfg)
452480
if err != nil {
453481
s.mu.Lock()
@@ -510,15 +538,6 @@ func (s *Service) readPersistedStatus(ctx context.Context, cfg model.RepoConfig)
510538
return st
511539
}
512540

513-
func (s *Service) publishHeadSnapshot(ctx context.Context, cfg model.RepoConfig, snap *snapshot.Store) (int64, error) {
514-
oid, ref, err := s.git.ResolveHEAD(ctx, cfg)
515-
if err != nil {
516-
return 0, err
517-
}
518-
gen, _, err := s.publishSnapshot(ctx, cfg, snap, oid, ref)
519-
return gen, err
520-
}
521-
522541
func (s *Service) publishSnapshot(ctx context.Context, cfg model.RepoConfig, snap *snapshot.Store, oid string, ref string) (int64, string, error) {
523542
nodes, err := s.git.BuildTreeIndex(ctx, cfg, oid)
524543
if err != nil {
@@ -547,23 +566,21 @@ func (s *Service) fetchState(ctx context.Context, cfg model.RepoConfig) (aheadBe
547566
return aheadBehind{ahead: ahead, behind: behind, diverged: diverged}, nil
548567
}
549568

550-
func (s *Service) startRuntime(ctx context.Context, rt *repoRuntime) {
569+
func (s *Service) startRuntime(rt *repoRuntime) {
551570
s.mu.Lock()
552571
s.running[rt.cfg.ID] = rt
553572
s.mu.Unlock()
554573

555574
go s.refreshLoop(rt)
556575

557576
w := watcher.New(500 * time.Millisecond)
558-
go w.Watch(ctx, rt.cfg.GitDir, func(sig watcher.Signal) {
559-
if sig.HEADChanged {
560-
s.onHEADChanged(ctx, rt)
561-
}
577+
go w.Watch(rt.ctx, rt.cfg.GitDir, func() {
578+
s.onHEADChanged(rt.ctx, rt)
562579
})
563580

564581
if rt.mfs != nil {
565582
go func() {
566-
_ = rt.mfs.Join(ctx)
583+
_ = rt.mfs.Join(rt.ctx)
567584
}()
568585
}
569586
}
@@ -654,7 +671,9 @@ func (s *Service) unmount(id model.RepoID) {
654671
}
655672

656673
func (s *Service) stopRuntime(rt *repoRuntime) {
657-
close(rt.stop)
674+
if rt.cancel != nil {
675+
rt.cancel()
676+
}
658677
if rt.mfs != nil {
659678
_ = rt.mfs.Unmount()
660679
}

internal/fusefs/merged.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,10 @@ func childName(parent, entryPath string) (string, bool) {
3434
return rel, true
3535
}
3636

37-
type SnapshotLookup interface {
38-
GetNode(generation int64, path string) (model.BaseNode, bool)
39-
ListChildren(generation int64, parentPath string) ([]model.BaseNode, error)
40-
}
41-
4237
type Resolver struct {
4338
generation atomic.Int64
4439
commitTime atomic.Int64 // unix seconds of HEAD commit
45-
Snapshot SnapshotLookup
40+
Snapshot model.SnapshotStore
4641
Overlay model.OverlayStore
4742
}
4843

internal/fusefs/merged_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ type fakeSnapshot struct {
1414
kids map[string][]model.BaseNode
1515
}
1616

17+
func (f *fakeSnapshot) PublishGeneration(_ context.Context, _ string, _ string, _ []model.BaseNode) (int64, error) {
18+
return 0, nil
19+
}
20+
1721
func (f *fakeSnapshot) GetNode(_ int64, path string) (model.BaseNode, bool) {
1822
n, ok := f.nodes[path]
1923
return n, ok

internal/overlay/store.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ func New(ctx context.Context, cfg model.RepoConfig) (*Store, error) {
4646
if err := os.MkdirAll(upperDir, 0o755); err != nil {
4747
return nil, err
4848
}
49-
if err := os.MkdirAll(filepath.Join(cfg.OverlayDir, "whiteouts"), 0o755); err != nil {
50-
return nil, err
51-
}
5249
return &Store{db: db, repo: cfg, upperDir: upperDir}, nil
5350
}
5451

internal/snapshot/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ func (s *Store) ReadState(ctx context.Context) (headOID, headRef string, generat
120120
return headOID, headRef, gen, nil
121121
}
122122

123+
func (s *Store) UpdateHEADRef(ctx context.Context, ref string) error {
124+
_, err := s.db.ExecContext(ctx, `INSERT INTO repo_state(key,value) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET value=excluded.value`, "head_ref", ref)
125+
return err
126+
}
127+
123128
func (s *Store) GetNode(generation int64, path string) (model.BaseNode, bool) {
124129
// Uses background context for backward compat; callers with a deadline
125130
// should use GetNodeCtx.

internal/watcher/watcher.go

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,84 @@ import (
44
"context"
55
"os"
66
"path/filepath"
7+
"strings"
78
"time"
89
)
910

10-
type Signal struct {
11-
HEADChanged bool
12-
RefsChanged bool
13-
IndexChanged bool
14-
}
15-
1611
type Poller struct {
17-
interval time.Duration
18-
prev map[string]time.Time
12+
interval time.Duration
13+
prev map[string]time.Time
14+
firstSeenChange map[string]struct{}
1915
}
2016

2117
func New(interval time.Duration) *Poller {
2218
if interval <= 0 {
2319
interval = 500 * time.Millisecond
2420
}
25-
return &Poller{interval: interval, prev: map[string]time.Time{}}
21+
return &Poller{interval: interval, prev: map[string]time.Time{}, firstSeenChange: map[string]struct{}{}}
2622
}
2723

28-
func (p *Poller) Watch(ctx context.Context, gitDir string, fn func(Signal)) {
24+
func (p *Poller) Watch(ctx context.Context, gitDir string, fn func()) {
25+
headPath := filepath.Join(gitDir, "HEAD")
2926
t := time.NewTicker(p.interval)
3027
defer t.Stop()
3128
for {
3229
select {
3330
case <-ctx.Done():
3431
return
3532
case <-t.C:
36-
s := Signal{}
37-
s.HEADChanged = p.changed(filepath.Join(gitDir, "HEAD"))
38-
s.IndexChanged = p.changed(filepath.Join(gitDir, "index"))
39-
s.RefsChanged = p.changed(filepath.Join(gitDir, "refs", "heads")) || p.changed(filepath.Join(gitDir, "refs", "remotes"))
40-
if s.HEADChanged || s.RefsChanged || s.IndexChanged {
41-
fn(s)
33+
if p.headChanged(headPath) {
34+
fn()
4235
}
4336
}
4437
}
4538
}
4639

40+
func (p *Poller) headChanged(headPath string) bool {
41+
if p.changed(headPath) {
42+
// A HEAD switch selects a different ref path. Prime that ref immediately
43+
// so the next branch-tip advance is compared against the new baseline.
44+
if refPath, ok := p.headRefPath(headPath); ok {
45+
p.primeCurrentRef(refPath)
46+
}
47+
return true
48+
}
49+
refPath, ok := p.headRefPath(headPath)
50+
if !ok {
51+
return false
52+
}
53+
return p.currentRefChanged(refPath)
54+
}
55+
56+
func (p *Poller) primeCurrentRef(refPath string) {
57+
_ = p.currentRefChanged(refPath)
58+
}
59+
60+
func (p *Poller) currentRefChanged(refPath string) bool {
61+
if _, err := os.Stat(refPath); err != nil {
62+
p.firstSeenChange[refPath] = struct{}{}
63+
return false
64+
}
65+
return p.changed(refPath)
66+
}
67+
68+
func (p *Poller) headRefPath(headPath string) (string, bool) {
69+
data, err := os.ReadFile(headPath)
70+
if err != nil {
71+
return "", false
72+
}
73+
const prefix = "ref: "
74+
line := strings.TrimSpace(string(data))
75+
if !strings.HasPrefix(line, prefix) {
76+
return "", false
77+
}
78+
ref := strings.TrimSpace(strings.TrimPrefix(line, prefix))
79+
if ref == "" {
80+
return "", false
81+
}
82+
return filepath.Join(filepath.Dir(headPath), filepath.FromSlash(ref)), true
83+
}
84+
4785
func (p *Poller) changed(path string) bool {
4886
st, err := os.Stat(path)
4987
if err != nil {
@@ -53,7 +91,12 @@ func (p *Poller) changed(path string) bool {
5391
prev, ok := p.prev[path]
5492
p.prev[path] = mtime
5593
if !ok {
94+
if _, changeOnFirstSeen := p.firstSeenChange[path]; changeOnFirstSeen {
95+
delete(p.firstSeenChange, path)
96+
return true
97+
}
5698
return false
5799
}
100+
delete(p.firstSeenChange, path)
58101
return mtime.After(prev)
59102
}

0 commit comments

Comments
 (0)