diff --git a/controllers/spec.go b/controllers/spec.go index c8107f7a..796bb098 100644 --- a/controllers/spec.go +++ b/controllers/spec.go @@ -577,6 +577,7 @@ type VersionUpgradeJobSpec struct { HConf string `json:"hadoopConf,omitempty"` PreUpgrade bool `json:"preUpgrade,omitempty"` PostUpgrade bool `json:"postUpgrade,omitempty"` + StorageInit bool `json:"storageInit,omitempty"` } func newUpgradeJobSpec(master *v1alpha1.CDAPMaster, name string, labels map[string]string, startTimeMs int64, cconf, hconf string) *VersionUpgradeJobSpec { @@ -607,3 +608,8 @@ func (s *VersionUpgradeJobSpec) SetPostUpgrade(isPostUpgrade bool) *VersionUpgra s.PostUpgrade = isPostUpgrade return s } + +func (s *VersionUpgradeJobSpec) SetStorageInit(isStorageInit bool) *VersionUpgradeJobSpec { + s.StorageInit = isStorageInit + return s +} diff --git a/controllers/version_update.go b/controllers/version_update.go index bb255d64..cc012d1e 100644 --- a/controllers/version_update.go +++ b/controllers/version_update.go @@ -194,6 +194,35 @@ func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, ob } } + if !isConditionTrue(master, updateStatus.StorageInitSucceeded) { + log.Printf("Version update: storage-init job not completed") + storageInitJobName := getStorageInitJobName(master.Status.UpgradeStartTimeMillis) + storageInitJobSpec := buildStorageInitJobSpec(storageInitJobName, master, labels) + job := findJob(storageInitJobName) + if job == nil { + obj, err := createJob(storageInitJobSpec) + if err != nil { + return nil, err + } + log.Printf("Version update: creating storage-init job") + return []reconciler.Object{*obj}, nil + } else if job.Status.Succeeded > 0 { + setCondition(master, updateStatus.StorageInitSucceeded) + log.Printf("Version update: storage-init job succeeded") + // Return empty to delete storageInit jobObj + return []reconciler.Object{}, nil + } else if job.Status.Failed > imageVersionUpgradeJobMaxRetryCount { + setCondition(master, updateStatus.StorageInitFailed) + setCondition(master, updateStatus.UpgradeFailed) + clearCondition(master, updateStatus.Inprogress) + log.Printf("Version update: storage-init job failed, exceeded max retries.") + return []reconciler.Object{}, nil + } else { + log.Printf("Version update: storage-init job inprogress.") + return []reconciler.Object{*buildObject(job)}, nil + } + } + // Then, actually update the image version if !isConditionTrue(master, updateStatus.VersionUpdated) { // If it's a patch revision, skip the pre and post upgrade jobs. Mark the update as succeeded. @@ -281,6 +310,10 @@ type VersionUpdateStatus struct { Inprogress status.Condition VersionUpdated status.Condition + // States for storage init + StorageInitSucceeded status.Condition + StorageInitFailed status.Condition + // states specifically upgrade PreUpgradeSucceeded status.Condition PreUpgradeFailed status.Condition @@ -306,6 +339,18 @@ func (s *VersionUpdateStatus) init() { Message: "Version to be used has been updated ", } + // States for storage init + s.StorageInitSucceeded = status.Condition{ + Type: "VersionStorageInitJobSucceeded", + Reason: "Start", + Message: "Version storage-init job is succeeded", + } + s.StorageInitFailed = status.Condition{ + Type: "VersionStorageInitJobFailed", + Reason: "Start", + Message: "Version storage-init job is failed", + } + // States for upgrade s.PreUpgradeSucceeded = status.Condition{ Type: "VersionPreUpgradeJobSucceeded", @@ -527,6 +572,11 @@ func getPostUpgradeJobName(startTimeMs int64) string { return fmt.Sprintf("post-upgrade-job-%d", startTimeMs/1000) } +// The returned name is just the suffix of actual k8s object name, as we prepend it with const string + CR name +func getStorageInitJobName(startTimeMs int64) string { + return fmt.Sprintf("storage-init-job-%d", startTimeMs/1000) +} + // Return pre-upgrade job spec func buildPreUpgradeJobSpec(jobName string, master *v1alpha1.CDAPMaster, labels map[string]string) *VersionUpgradeJobSpec { startTimeMs := master.Status.UpgradeStartTimeMillis @@ -545,6 +595,14 @@ func buildPostUpgradeJobSpec(jobName string, master *v1alpha1.CDAPMaster, labels return newUpgradeJobSpec(master, name, labels, startTimeMs, cconf, hconf).SetPostUpgrade(true) } +func buildStorageInitJobSpec(jobName string, master *v1alpha1.CDAPMaster, labels map[string]string) *VersionUpgradeJobSpec { + startTimeMs := master.Status.UpgradeStartTimeMillis + cconf := getObjName(master, configMapCConf) + hconf := getObjName(master, configMapHConf) + name := getObjName(master, jobName) + return newUpgradeJobSpec(master, name, labels, startTimeMs, cconf, hconf).SetStorageInit(true) +} + // Given an upgrade job spec, return a reconciler object as expected state func buildUpgradeJobObject(spec *VersionUpgradeJobSpec) (*reconciler.Object, error) { obj, err := k8s.ObjectFromFile(templateDir+templateUpgradeJob, spec, &batchv1.JobList{}) diff --git a/templates/upgrade-job.yaml b/templates/upgrade-job.yaml index d0e7404b..9b851199 100644 --- a/templates/upgrade-job.yaml +++ b/templates/upgrade-job.yaml @@ -46,6 +46,10 @@ spec: {{if .PreUpgrade}} - name: pre-upgrade args: ["io.cdap.cdap.master.upgrade.UpgradeJobMain", "{{.HostName}}", "11015"] + {{end}} + {{if .StorageInit}} + - name: storage-init + args: [ "io.cdap.cdap.master.environment.k8s.StorageMain"] {{end}} image: {{.Image}} volumeMounts: