Skip to content

[WIP] Perform StorageInit before dataplane image update #142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions controllers/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ type VersionUpgradeJobSpec struct {
HConf string `json:"hadoopConf,omitempty"`
PreUpgrade bool `json:"preUpgrade,omitempty"`
PostUpgrade bool `json:"postUpgrade,omitempty"`
StorageInit bool `json:"storageInit,omitempty"`
}

func newUpgradeJobSpec(master *v1alpha1.CDAPMaster, name string, labels map[string]string, startTimeMs int64, cconf, hconf string) *VersionUpgradeJobSpec {
Expand Down Expand Up @@ -607,3 +608,8 @@ func (s *VersionUpgradeJobSpec) SetPostUpgrade(isPostUpgrade bool) *VersionUpgra
s.PostUpgrade = isPostUpgrade
return s
}

func (s *VersionUpgradeJobSpec) SetStorageInit(isStorageInit bool) *VersionUpgradeJobSpec {
s.StorageInit = isStorageInit
return s
}
58 changes: 58 additions & 0 deletions controllers/version_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,35 @@ func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, ob
}
}

if !isConditionTrue(master, updateStatus.StorageInitSucceeded) {
log.Printf("Version update: storage-init job not completed")
storageInitJobName := getStorageInitJobName(master.Status.UpgradeStartTimeMillis)
storageInitJobSpec := buildStorageInitJobSpec(storageInitJobName, master, labels)
job := findJob(storageInitJobName)
if job == nil {
obj, err := createJob(storageInitJobSpec)
if err != nil {
return nil, err
}
log.Printf("Version update: creating storage-init job")
return []reconciler.Object{*obj}, nil
} else if job.Status.Succeeded > 0 {
setCondition(master, updateStatus.StorageInitSucceeded)
log.Printf("Version update: storage-init job succeeded")
// Return empty to delete storageInit jobObj
return []reconciler.Object{}, nil
} else if job.Status.Failed > imageVersionUpgradeJobMaxRetryCount {
setCondition(master, updateStatus.StorageInitFailed)
setCondition(master, updateStatus.UpgradeFailed)
clearCondition(master, updateStatus.Inprogress)
log.Printf("Version update: storage-init job failed, exceeded max retries.")
return []reconciler.Object{}, nil
} else {
log.Printf("Version update: storage-init job inprogress.")
return []reconciler.Object{*buildObject(job)}, nil
}
}

// Then, actually update the image version
if !isConditionTrue(master, updateStatus.VersionUpdated) {
// If it's a patch revision, skip the pre and post upgrade jobs. Mark the update as succeeded.
Expand Down Expand Up @@ -281,6 +310,10 @@ type VersionUpdateStatus struct {
Inprogress status.Condition
VersionUpdated status.Condition

// States for storage init
StorageInitSucceeded status.Condition
StorageInitFailed status.Condition

// states specifically upgrade
PreUpgradeSucceeded status.Condition
PreUpgradeFailed status.Condition
Expand All @@ -306,6 +339,18 @@ func (s *VersionUpdateStatus) init() {
Message: "Version to be used has been updated ",
}

// States for storage init
s.StorageInitSucceeded = status.Condition{
Type: "VersionStorageInitJobSucceeded",
Reason: "Start",
Message: "Version storage-init job is succeeded",
}
s.StorageInitFailed = status.Condition{
Type: "VersionStorageInitJobFailed",
Reason: "Start",
Message: "Version storage-init job is failed",
}

// States for upgrade
s.PreUpgradeSucceeded = status.Condition{
Type: "VersionPreUpgradeJobSucceeded",
Expand Down Expand Up @@ -527,6 +572,11 @@ func getPostUpgradeJobName(startTimeMs int64) string {
return fmt.Sprintf("post-upgrade-job-%d", startTimeMs/1000)
}

// The returned name is just the suffix of actual k8s object name, as we prepend it with const string + CR name
func getStorageInitJobName(startTimeMs int64) string {
return fmt.Sprintf("storage-init-job-%d", startTimeMs/1000)
}

// Return pre-upgrade job spec
func buildPreUpgradeJobSpec(jobName string, master *v1alpha1.CDAPMaster, labels map[string]string) *VersionUpgradeJobSpec {
startTimeMs := master.Status.UpgradeStartTimeMillis
Expand All @@ -545,6 +595,14 @@ func buildPostUpgradeJobSpec(jobName string, master *v1alpha1.CDAPMaster, labels
return newUpgradeJobSpec(master, name, labels, startTimeMs, cconf, hconf).SetPostUpgrade(true)
}

func buildStorageInitJobSpec(jobName string, master *v1alpha1.CDAPMaster, labels map[string]string) *VersionUpgradeJobSpec {
startTimeMs := master.Status.UpgradeStartTimeMillis
cconf := getObjName(master, configMapCConf)
hconf := getObjName(master, configMapHConf)
name := getObjName(master, jobName)
return newUpgradeJobSpec(master, name, labels, startTimeMs, cconf, hconf).SetStorageInit(true)
}

// Given an upgrade job spec, return a reconciler object as expected state
func buildUpgradeJobObject(spec *VersionUpgradeJobSpec) (*reconciler.Object, error) {
obj, err := k8s.ObjectFromFile(templateDir+templateUpgradeJob, spec, &batchv1.JobList{})
Expand Down
4 changes: 4 additions & 0 deletions templates/upgrade-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ spec:
{{if .PreUpgrade}}
- name: pre-upgrade
args: ["io.cdap.cdap.master.upgrade.UpgradeJobMain", "{{.HostName}}", "11015"]
{{end}}
{{if .StorageInit}}
- name: storage-init
args: [ "io.cdap.cdap.master.environment.k8s.StorageMain"]
{{end}}
image: {{.Image}}
volumeMounts:
Expand Down