Skip to content
This repository was archived by the owner on May 3, 2022. It is now read-only.

Commit ff75c31

Browse files
author
Oleg Sidorov
committed
Chart repo index fetcher is moved to a background job
Due to a massive overhead that happened as a consequence of a heavy chart repo index request pattern (introduced in 0.5.0), application and installation controller latencies spiked up. This change is aiming to get rid of ad-hoc repo index fetch approach and move this heavy operation to a background job. From now on, every repo instance start polling chart repo index every 10 seconds. Once the data is successfully fetched, it is preserved as a repo attribute unmarshalled. The first fetch is blocking: assuming Shipper starts cold, there is no previous cache we can rely upon (it starts in a new container). On top of it, index data is never cached on the disk as there is no use for it any longer: in-memory only. If repo fails to fetch repo index, it behaves quite naively: simply spins next iteration with the same delay. Signed-off-by: Oleg Sidorov <[email protected]>
1 parent bf0eef9 commit ff75c31

File tree

5 files changed

+80
-70
lines changed

5 files changed

+80
-70
lines changed

pkg/chart/repo/catalog.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ func (c *Catalog) CreateRepoIfNotExist(repoURL string) (*Repo, error) {
7272
fmt.Errorf("failed to create cache: %v", err),
7373
)
7474
}
75-
repo = NewRepo(repoURL, cache, c.fetcher)
75+
repo, err = NewRepo(repoURL, cache, c.fetcher)
76+
if err != nil {
77+
return nil, err
78+
}
7679
c.repos[name] = repo
7780
}
7881

pkg/chart/repo/catalog_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ func TestCreateRepoIfNotExist(t *testing.T) {
8282

8383
for _, testCase := range tests {
8484
t.Run(testCase.name, func(t *testing.T) {
85-
c := NewCatalog(testCase.factory, nil)
85+
c := NewCatalog(testCase.factory, func(_ string) ([]byte, error) {
86+
return []byte{}, nil
87+
})
8688
_, err := c.CreateRepoIfNotExist(testCase.url)
8789
if (err == nil && testCase.err != nil) ||
8890
(err != nil && testCase.err == nil) ||

pkg/chart/repo/repo.go

Lines changed: 46 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"path"
99
"path/filepath"
1010
"strings"
11-
"sync"
11+
"sync/atomic"
1212
"time"
1313

1414
// Importing this yaml package is a very crucial point:
@@ -20,6 +20,7 @@ import (
2020
yaml "github.com/ghodss/yaml"
2121
"github.com/golang/glog"
2222

23+
"k8s.io/apimachinery/pkg/util/wait"
2324
"k8s.io/helm/pkg/chartutil"
2425
"k8s.io/helm/pkg/proto/hapi/chart"
2526
"k8s.io/helm/pkg/repo"
@@ -29,7 +30,7 @@ import (
2930
)
3031

3132
const (
32-
RepoIndexTTL = 5 * time.Second
33+
RepoIndexRefreshPeriod = 10 * time.Second
3334
)
3435

3536
var (
@@ -38,28 +39,16 @@ var (
3839
)
3940

4041
type Repo struct {
41-
url string
42-
cache Cache
43-
fetcher RemoteFetcher
44-
mutex sync.Mutex
45-
indexFetched time.Time
42+
repoURL string
43+
indexURL string
44+
cache Cache
45+
fetcher RemoteFetcher
46+
index atomic.Value
47+
indexResolved chan struct{}
4648
}
4749

48-
func NewRepo(repoURL string, cache Cache, fetcher RemoteFetcher) *Repo {
49-
return &Repo{
50-
url: repoURL,
51-
cache: cache,
52-
fetcher: fetcher,
53-
}
54-
}
55-
56-
func (r *Repo) isIndexExpired() bool {
57-
return r.indexFetched.Add(RepoIndexTTL).Before(time.Now())
58-
}
59-
60-
// This method is not thread-safe and requires concurrency control by the caller
61-
func (r *Repo) refreshIndex() (*repo.IndexFile, error) {
62-
parsed, err := url.ParseRequestURI(r.url)
50+
func NewRepo(repoURL string, cache Cache, fetcher RemoteFetcher) (*Repo, error) {
51+
parsed, err := url.ParseRequestURI(repoURL)
6352
if err != nil {
6453
return nil, shippererrors.NewChartRepoIndexError(
6554
fmt.Errorf("failed to parse repo URL: %v", err),
@@ -68,29 +57,50 @@ func (r *Repo) refreshIndex() (*repo.IndexFile, error) {
6857
parsed.Path = path.Join(parsed.Path, "index.yaml")
6958
indexURL := parsed.String()
7059

71-
data, err := r.fetcher(indexURL)
60+
repo := &Repo{
61+
repoURL: repoURL,
62+
indexURL: indexURL,
63+
cache: cache,
64+
fetcher: fetcher,
65+
indexResolved: make(chan struct{}),
66+
}
67+
68+
// runs repo.refreshIndex forever
69+
go wait.Forever(func() {
70+
if err := repo.refreshIndex(); err != nil {
71+
glog.Errorf("failed to refresh repo %q index: %s", repo.repoURL, err)
72+
}
73+
}, RepoIndexRefreshPeriod)
74+
75+
return repo, nil
76+
}
77+
78+
func (r *Repo) refreshIndex() error {
79+
data, err := r.fetcher(r.indexURL)
7280
if err != nil {
73-
return nil, shippererrors.NewChartRepoIndexError(
74-
fmt.Errorf("failed to fetch %q: %v", indexURL, err),
81+
return shippererrors.NewChartRepoIndexError(
82+
fmt.Errorf("failed to fetch %q: %v", r.indexURL, err),
7583
)
7684
}
7785

7886
index, err := loadIndexData(data)
7987
if err != nil {
80-
return nil, shippererrors.NewChartRepoIndexError(
88+
return shippererrors.NewChartRepoIndexError(
8189
fmt.Errorf("failed to load index file: %v", err),
8290
)
8391
}
8492

85-
if err := r.cache.Store("index.yaml", data); err != nil {
86-
return nil, shippererrors.NewChartRepoIndexError(
87-
fmt.Errorf("failed to cache index.yaml: %v", err),
88-
)
89-
}
93+
r.index.Store(index)
9094

91-
r.indexFetched = time.Now()
95+
// close indexResolved once
96+
select {
97+
default:
98+
close(r.indexResolved)
99+
case <-r.indexResolved:
100+
// already closed
101+
}
92102

93-
return index, nil
103+
return nil
94104
}
95105

96106
func (r *Repo) ResolveVersion(chartspec *shipper.Chart) (*repo.ChartVersion, error) {
@@ -130,31 +140,10 @@ func (r *Repo) ResolveVersion(chartspec *shipper.Chart) (*repo.ChartVersion, err
130140
}
131141

132142
func (r *Repo) FetchChartVersions(chartspec *shipper.Chart) (repo.ChartVersions, error) {
133-
r.mutex.Lock()
134-
if r.isIndexExpired() {
135-
if _, err := r.refreshIndex(); err != nil {
136-
glog.Warningf("failed to refresh repo[%s] index: %s", chartspec.RepoURL, err)
137-
}
138-
}
139-
r.mutex.Unlock()
140143

141-
data, err := r.cache.Fetch("index.yaml")
142-
if err != nil {
143-
return nil, shippererrors.NewChartFetchFailureError(
144-
chartspec,
145-
err,
146-
)
147-
}
148-
149-
index, err := loadIndexData(data)
150-
if err != nil {
151-
return nil, shippererrors.NewChartFetchFailureError(
152-
chartspec,
153-
err,
154-
)
155-
}
144+
<-r.indexResolved
156145

157-
vs, ok := index.Entries[chartspec.Name]
146+
vs, ok := r.index.Load().(*repo.IndexFile).Entries[chartspec.Name]
158147
if !ok {
159148
return nil, repo.ErrNoChartName
160149
}
@@ -234,7 +223,7 @@ func (r *Repo) FetchRemote(cv *repo.ChartVersion) (*chart.Chart, error) {
234223

235224
// If the URL is relative (no scheme), prepend the chart repo's base URL
236225
if !chartURL.IsAbs() {
237-
repoURL, err := url.Parse(r.url)
226+
repoURL, err := url.Parse(r.repoURL)
238227
if err != nil {
239228
return nil, err
240229
}

pkg/chart/repo/repo_test.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,27 +127,34 @@ func TestRefreshIndex(t *testing.T) {
127127
for _, testCase := range tests {
128128
t.Run(testCase.name, func(t *testing.T) {
129129
var fetchedURL string
130+
var mutex sync.Mutex
130131

131132
cache := NewTestCache(testCase.name)
132133

133-
repo := NewRepo(
134+
repo, err := NewRepo(
134135
testCase.repoURL,
135136
cache,
136137
func(url string) ([]byte, error) {
138+
mutex.Lock()
139+
defer mutex.Unlock()
137140
fetchedURL = url
138141
return []byte(testCase.fetchBody), testCase.fetchErr
139142
},
140143
)
141144

142-
_, err := repo.refreshIndex()
145+
if err != nil {
146+
t.Fatalf("failed to initialize repo: %s", err)
147+
}
143148

144-
if !equivalent(err, testCase.expectedErr) {
149+
if err := repo.refreshIndex(); !equivalent(err, testCase.expectedErr) {
145150
t.Fatalf("Unexpected error: %q, want: %q", err, testCase.expectedErr)
146151
}
147152

153+
mutex.Lock()
148154
if fetchedURL != testCase.expectedFetchURL {
149155
t.Fatalf("Unexpected fetch URL: %q, want: %q", fetchedURL, testCase.expectedFetchURL)
150156
}
157+
mutex.Unlock()
151158
})
152159
}
153160
}
@@ -264,18 +271,21 @@ func TestResolveVersion(t *testing.T) {
264271

265272
for _, testCase := range tests {
266273
t.Run(testCase.name, func(t *testing.T) {
267-
repo := NewRepo(
274+
repo, err := NewRepo(
268275
"https://charts.example.com",
269276
cache,
270277
localFetch(t),
271278
)
272-
if _, err := repo.refreshIndex(); err != nil {
279+
if err != nil {
280+
t.Fatalf("failed to initialize repo: %s", err)
281+
}
282+
if err := repo.refreshIndex(); err != nil {
273283
t.Fatalf(err.Error())
274284
}
275285
chartspec := &shipper.Chart{
276286
Name: testCase.chartname,
277287
Version: testCase.verspec,
278-
RepoURL: repo.url,
288+
RepoURL: repo.repoURL,
279289
}
280290
gotcv, goterr := repo.ResolveVersion(chartspec)
281291

@@ -356,19 +366,22 @@ func TestFetch(t *testing.T) {
356366

357367
for _, testCase := range tests {
358368
t.Run(testCase.name, func(t *testing.T) {
359-
repo := NewRepo(
369+
repo, err := NewRepo(
360370
"https://chart.example.com",
361371
cache,
362372
localFetch(t),
363373
)
364-
if _, err := repo.refreshIndex(); err != nil {
374+
if err != nil {
375+
t.Fatalf("failed to initialize repo: %s", err)
376+
}
377+
if err := repo.refreshIndex(); err != nil {
365378
t.Fatalf(err.Error())
366379
}
367380

368381
chartspec := &shipper.Chart{
369382
Name: testCase.chartname,
370383
Version: testCase.chartver,
371-
RepoURL: repo.url,
384+
RepoURL: repo.repoURL,
372385
}
373386

374387
chart, err := repo.Fetch(chartspec)
@@ -394,7 +407,7 @@ func TestFetch(t *testing.T) {
394407
func TestConcurrentFetchChartVersionsRefreshesIndexOnce(t *testing.T) {
395408
var cnt int
396409
cache := NewTestCache("test-cache")
397-
repo := NewRepo(
410+
repo, err := NewRepo(
398411
"https://chart.example.com",
399412
cache,
400413
func(url string) ([]byte, error) {
@@ -405,6 +418,9 @@ func TestConcurrentFetchChartVersionsRefreshesIndexOnce(t *testing.T) {
405418
return []byte(IndexYamlResp), nil
406419
},
407420
)
421+
if err != nil {
422+
t.Fatalf("failed to initialize repo: %s", err)
423+
}
408424

409425
// Chart contents doesn't really matter
410426
chartspec := &shipper.Chart{

pkg/controller/release/release_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2123,7 +2123,7 @@ func workingOnIncumbentCapacity(percent int, wg *sync.WaitGroup, t *testing.T) {
21232123
func TestShouldNotProducePatches(t *testing.T) {
21242124
var wg sync.WaitGroup
21252125

2126-
for i := 0; i < 50; i++ {
2126+
for i := 0; i < 25; i++ {
21272127
wg.Add(1)
21282128
go workingOnContenderCapacity(i, &wg, t)
21292129

0 commit comments

Comments
 (0)