Skip to content

Commit beabb42

Browse files
committed
Refactor: Pipeline module (#417)
1 parent af8b5c3 commit beabb42

5 files changed

Lines changed: 21 additions & 97 deletions

File tree

cmd/qp/fork.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func rebuildCache(originName string) error {
5353
wg.Add(1)
5454
go func(targetDriver driver.Driver) {
5555
defer wg.Done()
56-
pipeline := phase.NewPipeline(targetDriver, cfg, false, cacheBasePath)
56+
pipeline := phase.NewPipeline(targetDriver, cfg, cacheBasePath)
5757

5858
if storage.IsLockFileExists(pipeline.CachePath) {
5959
return

cmd/qp/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func mainWithConfig(configProvider config.ConfigProvider) error {
4747

4848
var pipelines []*phase.Pipeline
4949
for _, driver := range origins.AvailableDrivers() {
50-
p := phase.NewPipeline(driver, cfg, isInteractive, cacheBaseDir)
50+
p := phase.NewPipeline(driver, cfg, cacheBaseDir)
5151
pipelines = append(pipelines, p)
5252
}
5353

internal/pipeline/phase/phase.go

Lines changed: 6 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,15 @@
11
package phase
22

33
import (
4-
"fmt"
54
"qp/internal/config"
6-
out "qp/internal/display"
7-
"qp/internal/pipeline/meta"
85
"qp/internal/pkgdata"
96
"sync"
107
)
118

12-
type (
13-
ProgressReporter = meta.ProgressReporter
14-
ProgressMessage = meta.ProgressMessage
15-
PkgInfo = pkgdata.PkgInfo
16-
)
17-
189
type Step func(
1910
cfg *config.Config,
20-
packages []*PkgInfo,
21-
progressReporter meta.ProgressReporter,
22-
) ([]*PkgInfo, error)
11+
pkgs []*pkgdata.PkgInfo,
12+
) ([]*pkgdata.PkgInfo, error)
2313

2414
type PipelinePhase struct {
2515
name string
@@ -28,75 +18,17 @@ type PipelinePhase struct {
2818
}
2919

3020
func NewPhase(name string, step Step, wg *sync.WaitGroup) PipelinePhase {
31-
return PipelinePhase{
32-
name,
33-
step,
34-
wg,
35-
}
21+
return PipelinePhase{name, step, wg}
3622
}
3723

3824
func (phase PipelinePhase) Run(
3925
cfg *config.Config,
40-
packages []*PkgInfo,
41-
isInteractive bool,
42-
) ([]*PkgInfo, error) {
43-
progressChan := phase.startProgress(isInteractive)
44-
outputPackages, err := phase.step(
45-
cfg,
46-
packages,
47-
phase.reportProgress(progressChan),
48-
)
49-
phase.stopProgress(progressChan)
50-
26+
pkgs []*pkgdata.PkgInfo,
27+
) ([]*pkgdata.PkgInfo, error) {
28+
outputPackages, err := phase.step(cfg, pkgs)
5129
if err != nil {
5230
return nil, err
5331
}
5432

5533
return outputPackages, nil
5634
}
57-
58-
func (phase PipelinePhase) reportProgress(progressChan chan ProgressMessage) ProgressReporter {
59-
if progressChan == nil {
60-
return ProgressReporter(func(_ int, _ int, _ string) {})
61-
}
62-
63-
return ProgressReporter(func(current int, total int, phaseName string) {
64-
progressChan <- ProgressMessage{
65-
Phase: phaseName,
66-
Progress: (current * 100) / total,
67-
Description: fmt.Sprintf(("%s is in progress..."), phase.name),
68-
}
69-
})
70-
}
71-
72-
func (phase PipelinePhase) startProgress(isInteractive bool) chan ProgressMessage {
73-
if !isInteractive {
74-
return nil
75-
}
76-
77-
progressChan := make(chan ProgressMessage)
78-
phase.wg.Add(1)
79-
80-
go func() {
81-
defer phase.wg.Done()
82-
phase.displayProgress(progressChan)
83-
}()
84-
85-
return progressChan
86-
}
87-
88-
func (phase PipelinePhase) stopProgress(progressChan chan ProgressMessage) {
89-
if progressChan != nil {
90-
close(progressChan)
91-
phase.wg.Wait()
92-
out.ClearProgress()
93-
}
94-
}
95-
96-
func (phase PipelinePhase) displayProgress(progressChan chan ProgressMessage) {
97-
for msg := range progressChan {
98-
out.PrintProgress(msg.Phase, msg.Progress, msg.Description)
99-
}
100-
101-
out.ClearProgress()
102-
}

internal/pipeline/phase/pipeline.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,25 @@ import (
1010
)
1111

1212
type Pipeline struct {
13-
Origin driver.Driver
14-
Config *config.Config
15-
Pkgs []*pkgdata.PkgInfo
16-
IsInteractive bool
17-
UsedCache bool
18-
CachePath string
19-
ModTime int64
13+
Origin driver.Driver
14+
Config *config.Config
15+
Pkgs []*pkgdata.PkgInfo
16+
UsedCache bool
17+
CachePath string
18+
ModTime int64
2019
}
2120

2221
func NewPipeline(
2322
origin driver.Driver,
2423
cfg *config.Config,
25-
isInteractive bool,
2624
baseCacheDir string,
2725
) *Pipeline {
2826
cachePath := filepath.Join(baseCacheDir, origin.Name())
2927

3028
return &Pipeline{
31-
Origin: origin,
32-
Config: cfg,
33-
IsInteractive: isInteractive,
34-
CachePath: cachePath,
29+
Origin: origin,
30+
Config: cfg,
31+
CachePath: cachePath,
3532
}
3633
}
3734

@@ -45,11 +42,11 @@ func (p *Pipeline) Run() ([]*pkgdata.PkgInfo, error) {
4542
NewPhase("Save cache", p.saveCacheStep, &wg),
4643
}
4744

48-
pkgs := []*PkgInfo{}
45+
pkgs := []*pkgdata.PkgInfo{}
4946
var err error
5047

5148
for _, ph := range phases {
52-
pkgs, err = ph.Run(p.Config, pkgs, p.IsInteractive)
49+
pkgs, err = ph.Run(p.Config, pkgs)
5350
if err != nil {
5451
return nil, fmt.Errorf("[%s] %w", ph.name, err)
5552
}
@@ -66,11 +63,11 @@ func (p *Pipeline) RunCacheOnly() ([]*pkgdata.PkgInfo, error) {
6663
NewPhase("Save cache", p.saveCacheStep, &wg),
6764
}
6865

69-
pkgs := []*PkgInfo{}
66+
pkgs := []*pkgdata.PkgInfo{}
7067
var err error
7168

7269
for _, ph := range phases {
73-
pkgs, err = ph.Run(p.Config, pkgs, p.IsInteractive)
70+
pkgs, err = ph.Run(p.Config, pkgs)
7471
if err != nil {
7572
return nil, fmt.Errorf("[%s] %w", ph.name, err)
7673
}

internal/pipeline/phase/steps.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"qp/internal/config"
66
out "qp/internal/display"
7-
"qp/internal/pipeline/meta"
87
"qp/internal/pkgdata"
98
"qp/internal/storage"
109
"time"
@@ -13,7 +12,6 @@ import (
1312
func (p *Pipeline) loadCacheStep(
1413
cfg *config.Config,
1514
_ []*pkgdata.PkgInfo,
16-
_ meta.ProgressReporter,
1715
) ([]*pkgdata.PkgInfo, error) {
1816
now := time.Now().Unix()
1917
if cfg.RegenCache {
@@ -50,7 +48,6 @@ func (p *Pipeline) loadCacheStep(
5048
func (p *Pipeline) fetchStep(
5149
_ *config.Config,
5250
pkgs []*pkgdata.PkgInfo,
53-
_ meta.ProgressReporter,
5451
) ([]*pkgdata.PkgInfo, error) {
5552
if p.UsedCache {
5653
return pkgs, nil
@@ -70,7 +67,6 @@ func (p *Pipeline) fetchStep(
7067
func (p *Pipeline) resolveStep(
7168
_ *config.Config,
7269
pkgs []*pkgdata.PkgInfo,
73-
reportProgress meta.ProgressReporter,
7470
) ([]*pkgdata.PkgInfo, error) {
7571
if p.UsedCache {
7672
return pkgs, nil
@@ -87,7 +83,6 @@ func (p *Pipeline) resolveStep(
8783
func (p *Pipeline) saveCacheStep(
8884
cfg *config.Config,
8985
pkgs []*pkgdata.PkgInfo,
90-
_ meta.ProgressReporter,
9186
) ([]*pkgdata.PkgInfo, error) {
9287
if cfg.NoCache || p.UsedCache {
9388
return pkgs, nil

0 commit comments

Comments
 (0)