Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version compatibility solution #21157

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/bootstrap/custom_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *service) UpgradeOneTenant(ctx context.Context, tenantID int32) error {
if err := v.HandleTenantUpgrade(ctx, tenantID, txn); err != nil {
return err
}
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, txn); err != nil {
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, v.Metadata().VersionOffset, txn); err != nil {
return err
}
from = v.Metadata().Version
Expand Down
2 changes: 1 addition & 1 deletion pkg/bootstrap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (s *service) execBootstrap(ctx context.Context) error {
if err := initPreprocessSQL(ctx, txn, s.GetFinalVersion(), s.GetFinalVersionOffset()); err != nil {
return err
}
if err := frontend.InitSysTenant(ctx, txn, s.GetFinalVersion()); err != nil {
if err := frontend.InitSysTenant(ctx, txn, s.GetFinalVersion(), s.GetFinalVersionOffset()); err != nil {
return err
}
if err := sysview.InitSchema(ctx, txn); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bootstrap/service_upgrade_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *service) MaybeUpgradeTenant(
if err := v.HandleTenantUpgrade(ctx, tenantID, txn); err != nil {
return err
}
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, txn); err != nil {
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, v.Metadata().VersionOffset, txn); err != nil {
return err
}
from = v.Metadata().Version
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *service) asyncUpgradeTenantTask(ctx context.Context) {
return err
}

if err = versions.UpgradeTenantVersion(id, h.Metadata().Version, txn); err != nil {
if err = versions.UpgradeTenantVersion(id, h.Metadata().Version, h.Metadata().VersionOffset, txn); err != nil {
s.logger.Error("failed to update upgrade tenant create version",
zap.Int32("tenant", id),
zap.String("upgrade", upgrade.String()),
Expand Down
5 changes: 2 additions & 3 deletions pkg/bootstrap/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@ import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_0_0"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_0_1"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_0_2"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_1_0"
)

// initUpgrade all versions need create a upgrade handle in pkg/bootstrap/versions
// package. And register the upgrade logic into handles.
func (s *service) initUpgrade() {
s.handles = append(s.handles, v1_2_0.Handler)
// TODO: When v1.2.0 release, open the commented code as follows, Enable v1.2.1 upgrade package
s.handles = append(s.handles, v1_2_1.Handler)
// TODO: When v1.2.1 release, open the commented code as follows, Enable v1.2.2 upgrade package
s.handles = append(s.handles, v1_2_2.Handler)
s.handles = append(s.handles, v1_2_3.Handler)
s.handles = append(s.handles, v2_0_0.Handler)
s.handles = append(s.handles, v2_0_1.Handler)
// TODO: When v2.0.1 release, open the commented code as follows, Enable v2.0.2 upgrade package
s.handles = append(s.handles, v2_0_2.Handler)
s.handles = append(s.handles, v2_1_0.Handler)
}

func (s *service) getFinalVersionHandle() VersionHandle {
Expand Down
4 changes: 3 additions & 1 deletion pkg/bootstrap/versions/upgrade_tenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ func GetTenantCreateVersionForUpdate(
func UpgradeTenantVersion(
tenantID int32,
version string,
versionOffset uint32,
txn executor.TxnExecutor) error {
sql := fmt.Sprintf("update mo_account set create_version = '%s' where account_id = %d",
sql := fmt.Sprintf("update mo_account set create_version = '%s', version_offset = %d where account_id = %d",
version,
versionOffset,
tenantID)
res, err := txn.Exec(sql, executor.StatementOption{})
if err != nil {
Expand Down
39 changes: 39 additions & 0 deletions pkg/bootstrap/versions/v2_1_0/cluster_upgrade_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2_1_0

import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

var clusterUpgEntries = []versions.UpgradeEntry{
upg_mo_account_version_offset,
}

var upg_mo_account_version_offset = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MOAccountTable,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_account add column version_offset int unsigned default 0 after create_version",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MOAccountTable, "version_offset")
if err != nil {
return false, err
}
return colInfo.IsExits, nil
},
}
24 changes: 24 additions & 0 deletions pkg/bootstrap/versions/v2_1_0/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2_1_0

import (
"github.com/matrixorigin/matrixone/pkg/common/log"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
)

func getLogger(sid string) *log.MOLogger {
return runtime.ServiceRuntime(sid).Logger()
}
21 changes: 21 additions & 0 deletions pkg/bootstrap/versions/v2_1_0/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2_1_0

import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
)

var tenantUpgEntries = []versions.UpgradeEntry{}
105 changes: 105 additions & 0 deletions pkg/bootstrap/versions/v2_1_0/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2_1_0

import (
"context"
"time"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

var (
Handler = &versionHandle{
metadata: versions.Version{
Version: "2.1.0",
MinUpgradeVersion: "2.0.2",
UpgradeCluster: versions.Yes,
UpgradeTenant: versions.Yes,
VersionOffset: uint32(len(clusterUpgEntries) + len(tenantUpgEntries)),
},
}
)

type versionHandle struct {
metadata versions.Version
}

func (v *versionHandle) Metadata() versions.Version {
return v.metadata
}

func (v *versionHandle) Prepare(
ctx context.Context,
txn executor.TxnExecutor,
final bool) error {
txn.Use(catalog.MO_CATALOG)
return nil
}

func (v *versionHandle) HandleTenantUpgrade(
ctx context.Context,
tenantID int32,
txn executor.TxnExecutor) error {

for _, upgEntry := range tenantUpgEntries {
start := time.Now()

err := upgEntry.Upgrade(txn, uint32(tenantID))
if err != nil {
getLogger(txn.Txn().TxnOptions().CN).Error("tenant upgrade entry execute error", zap.Error(err), zap.Int32("tenantId", tenantID), zap.String("version", v.Metadata().Version), zap.String("upgrade entry", upgEntry.String()))
return err
}

duration := time.Since(start)
getLogger(txn.Txn().TxnOptions().CN).Info("tenant upgrade entry complete",
zap.String("upgrade entry", upgEntry.String()),
zap.Int64("time cost(ms)", duration.Milliseconds()),
zap.Int32("tenantId", tenantID),
zap.String("toVersion", v.Metadata().Version))
}

return nil
}

func (v *versionHandle) HandleClusterUpgrade(
ctx context.Context,
txn executor.TxnExecutor) error {
for _, upgEntry := range clusterUpgEntries {
start := time.Now()

err := upgEntry.Upgrade(txn, catalog.System_Account)
if err != nil {
getLogger(txn.Txn().TxnOptions().CN).Error("cluster upgrade entry execute error", zap.Error(err), zap.String("version", v.Metadata().Version), zap.String("upgrade entry", upgEntry.String()))
return err
}

duration := time.Since(start)
getLogger(txn.Txn().TxnOptions().CN).Info("cluster upgrade entry complete",
zap.String("upgrade entry", upgEntry.String()),
zap.Int64("time cost(ms)", duration.Milliseconds()),
zap.String("toVersion", v.Metadata().Version))
}
return nil
}

func (v *versionHandle) HandleCreateFrameworkDeps(txn executor.TxnExecutor) error {
return moerr.NewInternalErrorNoCtxf("Only v1.2.0 can initialize upgrade framework, current version is:%s", Handler.metadata.Version)
}
4 changes: 4 additions & 0 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ func (s *service) GetFinalVersion() string {
return s.bootstrapService.GetFinalVersion()
}

func (s *service) GetFinalVersionOffset() int32 {
return s.bootstrapService.GetFinalVersionOffset()
}

func (s *service) stopFrontend() error {
defer logutil.LogClose(s.logger, "cnservice/frontend")()

Expand Down
13 changes: 8 additions & 5 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,14 +1107,16 @@ var (
status,
created_time,
comments,
create_version) values (%d,"%s","%s","%s","%s","%s","%s");`
create_version,
version_offset) values (%d,"%s","%s","%s","%s","%s","%s",%d);`
initMoAccountWithoutIDFormat = `insert into mo_catalog.mo_account(
account_name,
admin_name,
status,
created_time,
comments,
create_version) values ("%s","%s","%s","%s","%s","%s");`
create_version,
version_offset) values ("%s","%s","%s","%s","%s","%s",%d);`
initMoRoleFormat = `insert into mo_catalog.mo_role(
role_id,
role_name,
Expand Down Expand Up @@ -7313,6 +7315,7 @@ func InitGeneralTenant(ctx context.Context, bh BackgroundExec, ses *Session, ca
defer span.End()
tenant := ses.GetTenantInfo()
finalVersion := ses.rm.baseService.GetFinalVersion()
finalVersionOffset := ses.rm.baseService.GetFinalVersionOffset()

if !(tenant.IsSysTenant() && tenant.IsMoAdminRole()) {
return moerr.NewInternalErrorf(ctx, "tenant %s user %s role %s do not have the privilege to create the new account", tenant.GetTenant(), tenant.GetUser(), tenant.GetDefaultRole())
Expand Down Expand Up @@ -7390,7 +7393,7 @@ func InitGeneralTenant(ctx context.Context, bh BackgroundExec, ses *Session, ca
}
return rtnErr
} else {
newTenant, newTenantCtx, rtnErr = createTablesInMoCatalogOfGeneralTenant(ctx, bh, finalVersion, ca)
newTenant, newTenantCtx, rtnErr = createTablesInMoCatalogOfGeneralTenant(ctx, bh, finalVersion, finalVersionOffset, ca)
if rtnErr != nil {
return rtnErr
}
Expand Down Expand Up @@ -7472,7 +7475,7 @@ func InitGeneralTenant(ctx context.Context, bh BackgroundExec, ses *Session, ca
}

// createTablesInMoCatalogOfGeneralTenant creates catalog tables in the database mo_catalog.
func createTablesInMoCatalogOfGeneralTenant(ctx context.Context, bh BackgroundExec, finalVersion string, ca *createAccount) (*TenantInfo, context.Context, error) {
func createTablesInMoCatalogOfGeneralTenant(ctx context.Context, bh BackgroundExec, finalVersion string, finalVersionOffset int32, ca *createAccount) (*TenantInfo, context.Context, error) {
var err error
var initMoAccount string
var erArray []ExecResult
Expand Down Expand Up @@ -7510,7 +7513,7 @@ func createTablesInMoCatalogOfGeneralTenant(ctx context.Context, bh BackgroundEx
}
}

initMoAccount = fmt.Sprintf(initMoAccountWithoutIDFormat, ca.Name, ca.AdminName, status, types.CurrentTimestamp().String2(time.UTC, 0), comment, finalVersion)
initMoAccount = fmt.Sprintf(initMoAccountWithoutIDFormat, ca.Name, ca.AdminName, status, types.CurrentTimestamp().String2(time.UTC, 0), comment, finalVersion, finalVersionOffset)
//execute the insert
err = bh.Exec(ctx, initMoAccount)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/frontend/authenticate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7490,6 +7490,10 @@ func (m *MockBaseService) GetFinalVersion() string {
return "1.2.0"
}

func (m *MockBaseService) GetFinalVersionOffset() int32 {
return 8
}

func (s *MockBaseService) UpgradeTenant(ctx context.Context, tenantName string, retryCount uint32, isALLAccount bool) error {
//TODO implement me
panic("implement me")
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/predefined.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ var (
comments varchar(256),
version bigint unsigned auto_increment,
suspended_time timestamp default NULL,
create_version varchar(50) default '1.2.0'
create_version varchar(50) default '1.2.0',
version_offset int unsigned default 0
)`

MoCatalogMoRoleDDL = `create table mo_catalog.mo_role (
Expand Down
2 changes: 2 additions & 0 deletions pkg/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type BaseService interface {
CheckTenantUpgrade(ctx context.Context, tenantID int64) error
// GetFinalVersion Get mo final version, which is based on the current code
GetFinalVersion() string
// GetFinalVersionOffset Get mo final version offset, which is based on the current code
GetFinalVersionOffset() int32
// UpgradeTenant used to upgrade tenant
UpgradeTenant(ctx context.Context, tenantName string, retryCount uint32, isALLAccount bool) error
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/system_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

// InitSysTenant initializes the tenant SYS before any tenants and accepting any requests
// during the system is booting.
func InitSysTenant(ctx context.Context, txn executor.TxnExecutor, finalVersion string) (err error) {
func InitSysTenant(ctx context.Context, txn executor.TxnExecutor, finalVersion string, finalVersonOffset int32) (err error) {
txn.Use(catalog.MO_CATALOG)
res, err := txn.Exec(createDbInformationSchemaSql, executor.StatementOption{})
if err != nil {
Expand All @@ -45,15 +45,15 @@ func InitSysTenant(ctx context.Context, txn executor.TxnExecutor, finalVersion s
}

if !exists {
if err = createTablesInMoCatalog(ctx, txn, finalVersion); err != nil {
if err = createTablesInMoCatalog(ctx, txn, finalVersion, finalVersonOffset); err != nil {
return err
}
}
return err
}

// createTablesInMoCatalog creates catalog tables in the database mo_catalog.
func createTablesInMoCatalog(ctx context.Context, txn executor.TxnExecutor, finalVersion string) error {
func createTablesInMoCatalog(ctx context.Context, txn executor.TxnExecutor, finalVersion string, finalVersonOffset int32) error {
var initMoAccount string
var initDataSqls []string
var err error
Expand All @@ -69,7 +69,7 @@ func createTablesInMoCatalog(ctx context.Context, txn executor.TxnExecutor, fina

//initialize the default data of tables for the tenant
//step 1: add new tenant entry to the mo_account
initMoAccount = fmt.Sprintf(initMoAccountFormat, sysAccountID, sysAccountName, rootName, sysAccountStatus, types.CurrentTimestamp().String2(time.UTC, 0), sysAccountComments, finalVersion)
initMoAccount = fmt.Sprintf(initMoAccountFormat, sysAccountID, sysAccountName, rootName, sysAccountStatus, types.CurrentTimestamp().String2(time.UTC, 0), sysAccountComments, finalVersion, finalVersonOffset)
addSqlIntoSet(initMoAccount)

//step 2:add new role entries to the mo_role
Expand Down
Loading
Loading