Skip to content
This repository has been archived by the owner on Jul 16, 2021. It is now read-only.

Commit

Permalink
[chart-repo] Store repositories checksum in cache to avoid reprocessi…
Browse files Browse the repository at this point in the history
…ng (#637)

* Store repositories checksum in cache to avoid reprocessing

Signed-off-by: Andres Martinez Gotor <[email protected]>

* Update log message

Signed-off-by: Andres Martinez Gotor <[email protected]>
  • Loading branch information
Andres Martinez Gotor authored Jul 15, 2019
1 parent 010bd09 commit ae97a0d
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 7 deletions.
6 changes: 6 additions & 0 deletions cmd/chart-repo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ type chartFiles struct {
Repo repo
Digest string
}

type repoCheck struct {
ID string `bson:"_id"`
LastUpdate time.Time `bson:"last_update"`
Checksum string `bson:"checksum"`
}
62 changes: 55 additions & 7 deletions cmd/chart-repo/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"archive/tar"
"bytes"
"compress/gzip"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"errors"
Expand All @@ -45,6 +46,7 @@ import (

const (
chartCollection = "charts"
repositoryCollection = "repos"
chartFilesCollection = "files"
defaultTimeoutSeconds = 10
additionalCAFile = "/usr/local/share/ca-certificates/ca.crt"
Expand All @@ -62,7 +64,7 @@ type httpClient interface {

var netClient httpClient = &http.Client{}

func parseRepoUrl(repoURL string) (*url.URL, error) {
func parseRepoURL(repoURL string) (*url.URL, error) {
repoURL = strings.TrimSpace(repoURL)
return url.ParseRequestURI(repoURL)
}
Expand All @@ -85,14 +87,30 @@ func init() {
// imported into the database as fast as possible. E.g. we want all icons for
// charts before fetching readmes for each chart and version pair.
func syncRepo(dbSession datastore.Session, repoName, repoURL string, authorizationHeader string) error {
url, err := parseRepoUrl(repoURL)
url, err := parseRepoURL(repoURL)
if err != nil {
log.WithFields(log.Fields{"url": repoURL}).WithError(err).Error("failed to parse URL")
return err
}

r := repo{Name: repoName, URL: url.String(), AuthorizationHeader: authorizationHeader}
index, err := fetchRepoIndex(r)
repoBytes, err := fetchRepoIndex(r)
if err != nil {
return err
}

repoChecksum, err := getSha256(repoBytes)
if err != nil {
return err
}

// Check if the repo has been already processed
if repoAlreadyProcessed(dbSession, repoName, repoChecksum) {
log.WithFields(log.Fields{"url": repoURL}).Info("Skipping repository since there are no updates")
return nil
}

index, err := parseRepoIndex(repoBytes)
if err != nil {
return err
}
Expand Down Expand Up @@ -148,9 +166,39 @@ func syncRepo(dbSession datastore.Session, repoName, repoURL string, authorizati
// Wait for the worker pools to finish processing
wg.Wait()

// Update cache in the database
if err = updateLastCheck(dbSession, repoName, repoChecksum, time.Now()); err != nil {
return err
}
log.WithFields(log.Fields{"url": repoURL}).Info("Stored repository update in cache")

return nil
}

func getSha256(src []byte) (string, error) {
f := bytes.NewReader(src)
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum(nil)), nil
}

func repoAlreadyProcessed(dbSession datastore.Session, repoName string, checksum string) bool {
db, closer := dbSession.DB()
defer closer()
lastCheck := &repoCheck{}
err := db.C(repositoryCollection).Find(bson.M{"_id": repoName}).One(lastCheck)
return err == nil && checksum == lastCheck.Checksum
}

func updateLastCheck(dbSession datastore.Session, repoName string, checksum string, now time.Time) error {
db, closer := dbSession.DB()
defer closer()
_, err := db.C(repositoryCollection).UpsertId(repoName, bson.M{"$set": bson.M{"last_update": now, "checksum": checksum}})
return err
}

func deleteRepo(dbSession datastore.Session, repoName string) error {
db, closer := dbSession.DB()
defer closer()
Expand All @@ -167,8 +215,8 @@ func deleteRepo(dbSession datastore.Session, repoName string) error {
return err
}

func fetchRepoIndex(r repo) (*helmrepo.IndexFile, error) {
indexURL, err := parseRepoUrl(r.URL)
func fetchRepoIndex(r repo) ([]byte, error) {
indexURL, err := parseRepoURL(r.URL)
if err != nil {
log.WithFields(log.Fields{"url": r.URL}).WithError(err).Error("failed to parse URL")
return nil, err
Expand Down Expand Up @@ -203,7 +251,7 @@ func fetchRepoIndex(r repo) (*helmrepo.IndexFile, error) {
if err != nil {
return nil, err
}
return parseRepoIndex(body)
return body, nil
}

func parseRepoIndex(body []byte) (*helmrepo.IndexFile, error) {
Expand Down Expand Up @@ -433,7 +481,7 @@ func extractFilesFromTarball(filenames []string, tarf *tar.Reader) (map[string]s

func chartTarballURL(r repo, cv chartVersion) string {
source := cv.URLs[0]
if _, err := parseRepoUrl(source); err != nil {
if _, err := parseRepoURL(source); err != nil {
// If the chart URL is not absolute, join with repo URL. It's fine if the
// URL we build here is invalid as we can catch this error when actually
// making the request
Expand Down
51 changes: 51 additions & 0 deletions cmd/chart-repo/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"path"
"strings"
"testing"
"time"

"github.com/arschles/assert"
"github.com/disintegration/imaging"
Expand Down Expand Up @@ -609,7 +610,57 @@ func (h *emptyChartRepoHTTPClient) Do(req *http.Request) (*http.Response, error)
func Test_emptyChartRepo(t *testing.T) {
netClient = &emptyChartRepoHTTPClient{}
m := mock.Mock{}
m.On("One", &repoCheck{}).Return(nil)
dbSession := mockstore.NewMockSession(&m)
err := syncRepo(dbSession, "testRepo", "https://my.examplerepo.com", "")
assert.ExistsErr(t, err, "Failed Request")
}

func Test_getSha256(t *testing.T) {
sha, err := getSha256([]byte("this is a test"))
assert.Equal(t, err, nil, "Unable to get sha")
assert.Equal(t, sha, "2e99758548972a8e8822ad47fa1017ff72f06f3ff6a016851f45c398732bc50c", "Unable to get sha")
}

func Test_repoAlreadyProcessed(t *testing.T) {
tests := []struct {
name string
checksum string
mockedLastCheck repoCheck
processed bool
}{
{"not processed yet", "bar", repoCheck{}, false},
{"already processed", "bar", repoCheck{Checksum: "bar"}, true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := mock.Mock{}
repo := &repoCheck{}
m.On("One", repo).Run(func(args mock.Arguments) {
*args.Get(0).(*repoCheck) = tt.mockedLastCheck
}).Return(nil)
dbSession := mockstore.NewMockSession(&m)
res := repoAlreadyProcessed(dbSession, "", tt.checksum)
if res != tt.processed {
t.Errorf("Expected alreadyProcessed to be %v got %v", tt.processed, res)
}
})
}
}

func Test_updateLastCheck(t *testing.T) {
m := mock.Mock{}
repoName := "foo"
checksum := "bar"
now := time.Now()
m.On("UpsertId", repoName, bson.M{"$set": bson.M{"last_update": now, "checksum": checksum}}).Return(nil)
dbSession := mockstore.NewMockSession(&m)
err := updateLastCheck(dbSession, repoName, checksum, now)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if len(m.Calls) != 1 {
t.Errorf("Expected one call got %d", len(m.Calls))
}
}

0 comments on commit ae97a0d

Please sign in to comment.