Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version compatibility solution #21157

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion pkg/bootstrap/custom_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/schemaversion"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

Expand Down Expand Up @@ -141,6 +143,14 @@ func (s *service) UpgradeOneTenant(ctx context.Context, tenantID int32) error {
time.Sleep(time.Second)
}

//----------------------------------------------------------------------------------------------------------
clusterVersion, isFinal, err := versions.GetCurrentClusterVersion(s.GetFinalVersion(), txn)
if err != nil {
s.logger.Error("failed get current cluster version", zap.Error(err))
return err
}
//----------------------------------------------------------------------------------------------------------

// upgrade in current goroutine immediately
version, err = versions.GetTenantCreateVersionForUpdate(tenantID, txn)
if err != nil {
Expand All @@ -150,10 +160,29 @@ func (s *service) UpgradeOneTenant(ctx context.Context, tenantID int32) error {
for _, v := range s.handles {
if versions.Compare(v.Metadata().Version, from) >= 0 &&
v.Metadata().CanDirectUpgrade(from) {

//--------------------------------------------------------------------------------------------------
versionInfo := schemaversion.NewVersionInfo()
versionInfo.FinalVersionCompleted = false

versionInfo.Cluster.Version = clusterVersion
versionInfo.Cluster.IsFinalVersion = isFinal

handleOffset := versions.Compare(versionInfo.Cluster.Version, "2.1.0") >= 0
accVersion, accOffset, err := versions.GeAccountVersion(uint32(tenantID), handleOffset, txn)
if err != nil {
return err
}
versionInfo.Account.Version = accVersion
versionInfo.Account.VersionOffset = accOffset

txn.SetCtxValue(defines.VersionInfoKey{}, versionInfo)
//--------------------------------------------------------------------------------------------------

if err := v.HandleTenantUpgrade(ctx, tenantID, txn); err != nil {
return err
}
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, txn); err != nil {
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, v.Metadata().VersionOffset, handleOffset, txn); err != nil {
return err
}
from = v.Metadata().Version
Expand Down
2 changes: 1 addition & 1 deletion pkg/bootstrap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (s *service) execBootstrap(ctx context.Context) error {
if err := initPreprocessSQL(ctx, txn, s.GetFinalVersion(), s.GetFinalVersionOffset()); err != nil {
return err
}
if err := frontend.InitSysTenant(ctx, txn, s.GetFinalVersion()); err != nil {
if err := frontend.InitSysTenant(ctx, txn, s.GetFinalVersion(), s.GetFinalVersionOffset()); err != nil {
return err
}
if err := sysview.InitSchema(ctx, txn); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/bootstrap/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/lock"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/schemaversion"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
"github.com/matrixorigin/matrixone/pkg/txn/rpc"
Expand Down Expand Up @@ -227,6 +228,14 @@ func (tTxnOp *testTxnOperator) SetFootPrints(id int, enter bool) {
panic("implement me")
}

func (tTxnOp *testTxnOperator) SetVersionInfo(versionInfo *schemaversion.VersionInfo) {
panic("implement me")
}

func (tTxnOp *testTxnOperator) GetVersionInfo() *schemaversion.VersionInfo {
panic("implement me")
}

func TestBootstrapAlreadyBootstrapped(t *testing.T) {
sid := ""
runtime.RunTest(
Expand Down
3 changes: 3 additions & 0 deletions pkg/bootstrap/service_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/log"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)

Expand Down Expand Up @@ -317,12 +318,14 @@ func (s *service) asyncUpgradeTask(ctx context.Context) {
return
case <-timer.C:
if s.upgrade.finalVersionCompleted.Load() {
runtime.ServiceRuntime(s.sid).SetGlobalVariables(runtime.ClusterIsFinalVersion, true)
return
}

completed, err := fn()
if err == nil && completed {
s.upgrade.finalVersionCompleted.Store(true)
runtime.ServiceRuntime(s.sid).SetGlobalVariables(runtime.ClusterIsFinalVersion, true)
return
}
timer.Reset(s.upgrade.checkUpgradeDuration)
Expand Down
74 changes: 72 additions & 2 deletions pkg/bootstrap/service_upgrade_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/schemaversion"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/util/executor"
)
Expand Down Expand Up @@ -98,6 +100,14 @@ func (s *service) MaybeUpgradeTenant(
time.Sleep(time.Second)
}

//----------------------------------------------------------------------------------------------------------
clusterVersion, isFinal, err := versions.GetCurrentClusterVersion(s.GetFinalVersion(), txn)
if err != nil {
s.logger.Error("failed get current cluster version", zap.Error(err))
return err
}
//----------------------------------------------------------------------------------------------------------

// upgrade in current goroutine immediately
version, err = versions.GetTenantCreateVersionForUpdate(tenantID, txn)
if err != nil {
Expand All @@ -107,10 +117,46 @@ func (s *service) MaybeUpgradeTenant(
for _, v := range s.handles {
if versions.Compare(v.Metadata().Version, from) > 0 &&
v.Metadata().CanDirectUpgrade(from) {

//--------------------------------------------------------------------------------------------------
versionInfo := schemaversion.NewVersionInfo()
versionInfo.FinalVersionCompleted = false
versionInfo.Cluster.Version = clusterVersion
versionInfo.Cluster.IsFinalVersion = isFinal

handleOffset := versions.Compare(versionInfo.Cluster.Version, "2.1.0") >= 0
accVersion, accOffset, err := versions.GeAccountVersion(uint32(tenantID), handleOffset, txn)
if err != nil {
return err
}
versionInfo.Account.Version = accVersion
versionInfo.Account.VersionOffset = accOffset

txn.SetCtxValue(defines.VersionInfoKey{}, versionInfo)

//var versionInfo defines.VersionInfo
//versionInfo.FinalVersion = s.GetFinalVersion()
//versionInfo.FinalVersionOffset = s.GetFinalVersionOffset()
//versionInfo.FinalVersionCompleted = false
//
//versionInfo.Cluster.Version = clusterVersion
//versionInfo.Cluster.IsFinalVersion = isFinal
//
//handleOffset := versions.Compare(versionInfo.Cluster.Version, "2.1.0") >= 0
//accVersion, accOffset, err := versions.GeAccountVersion(uint32(tenantID), handleOffset, txn)
//if err != nil {
// return err
//}
//versionInfo.Account.Version = accVersion
//versionInfo.Account.VersionOffset = accOffset
//
//txn.SetCtxValue(defines.VersionInfoKey{}, &versionInfo)
//--------------------------------------------------------------------------------------------------

if err := v.HandleTenantUpgrade(ctx, tenantID, txn); err != nil {
return err
}
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, txn); err != nil {
if err := versions.UpgradeTenantVersion(tenantID, v.Metadata().Version, v.Metadata().VersionOffset, handleOffset, txn); err != nil {
return err
}
from = v.Metadata().Version
Expand Down Expand Up @@ -183,6 +229,14 @@ func (s *service) asyncUpgradeTenantTask(ctx context.Context) {
return nil
}

clusterVersion, isFinal, err := versions.GetCurrentClusterVersion(s.GetFinalVersion(), txn)
if err != nil {
s.logger.Error("failed get current cluster version",
zap.String("upgrade", upgrade.String()),
zap.Error(err))
return err
}

hasUpgradeTenants = true
h := s.getVersionHandle(upgrade.ToVersion)
updated := int32(0)
Expand All @@ -204,6 +258,22 @@ func (s *service) asyncUpgradeTenantTask(ctx context.Context) {
zap.String("tenant-version", createVersion),
zap.String("upgrade", upgrade.String()))

//--------------------------------------------------------------------------------------------------
versionInfo := schemaversion.NewVersionInfo()
versionInfo.FinalVersionCompleted = false
versionInfo.Cluster.Version = clusterVersion
versionInfo.Cluster.IsFinalVersion = isFinal

handleOffset := versions.Compare(versionInfo.Cluster.Version, "2.1.0") >= 0
accVersion, accOffset, err := versions.GeAccountVersion(uint32(id), handleOffset, txn)
if err != nil {
return err
}
versionInfo.Account.Version = accVersion
versionInfo.Account.VersionOffset = accOffset

txn.SetCtxValue(defines.VersionInfoKey{}, versionInfo)
//--------------------------------------------------------------------------------------------------
if err = h.HandleTenantUpgrade(ctx, id, txn); err != nil {
s.logger.Error("failed to execute upgrade tenant",
zap.Int32("tenant", id),
Expand All @@ -213,7 +283,7 @@ func (s *service) asyncUpgradeTenantTask(ctx context.Context) {
return err
}

if err = versions.UpgradeTenantVersion(id, h.Metadata().Version, txn); err != nil {
if err = versions.UpgradeTenantVersion(id, h.Metadata().Version, h.Metadata().VersionOffset, handleOffset, txn); err != nil {
s.logger.Error("failed to update upgrade tenant create version",
zap.Int32("tenant", id),
zap.String("upgrade", upgrade.String()),
Expand Down
7 changes: 7 additions & 0 deletions pkg/bootstrap/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_0_1"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_0_2"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v2_1_0"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/schemaversion"
)

// initUpgrade all versions need create a upgrade handle in pkg/bootstrap/versions
Expand All @@ -40,6 +42,11 @@ func (s *service) initUpgrade() {
s.handles = append(s.handles, v2_0_2.Handler)
s.handles = append(s.handles, v2_1_0.Handler)

// Store the schema version information of the current binary file into the runtime global variable
runtime.ServiceRuntime(s.sid).SetGlobalVariables(runtime.FinalVersion, s.handles[len(s.handles)-1].Metadata().Version)
runtime.ServiceRuntime(s.sid).SetGlobalVariables(runtime.FinalVersionOffset, s.handles[len(s.handles)-1].Metadata().VersionOffset)
schemaversion.FinalVersion = s.handles[len(s.handles)-1].Metadata().Version
schemaversion.FinalVersionOffset = int32(s.handles[len(s.handles)-1].Metadata().VersionOffset)
}

func (s *service) getFinalVersionHandle() VersionHandle {
Expand Down
81 changes: 80 additions & 1 deletion pkg/bootstrap/versions/upgrade_tenant_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,19 @@ func GetTenantCreateVersionForUpdate(
func UpgradeTenantVersion(
tenantID int32,
version string,
versionOffset uint32,
handleOffset bool,
txn executor.TxnExecutor) error {
sql := fmt.Sprintf("update mo_account set create_version = '%s' where account_id = %d",

sql := fmt.Sprintf("update mo_account set create_version = '%s', version_offset = %d where account_id = %d",
version,
versionOffset,
tenantID)

if !handleOffset {
sql = fmt.Sprintf("update mo_account set create_version = '%s' where account_id = %d", version, tenantID)
}

res, err := txn.Exec(sql, executor.StatementOption{})
if err != nil {
return err
Expand All @@ -159,6 +168,76 @@ func UpgradeTenantVersion(
return nil
}

func GetCurrentClusterVersion(finalVersion string, txn executor.TxnExecutor) (string, bool, error) {
sql := fmt.Sprintf("SELECT to_version, final_version = to_version FROM %s WHERE state >= %d AND (final_version, final_version_offset) IN (SELECT version, version_offset FROM %s ORDER BY create_at DESC LIMIT 1) ORDER BY upgrade_order DESC LIMIT 1",
catalog.MOUpgradeTable,
StateUpgradingTenant,
catalog.MOVersionTable,
)
res, err := txn.Exec(sql, executor.StatementOption{})
if err != nil {
return "", false, err
}
version := ""
isFinalVersion := false

loaded := false
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
version = cols[0].GetStringAt(0)
isFinalVersion = vector.GetFixedAtWithTypeCheck[bool](cols[1], 0)
loaded = true
return true
})
res.Close()

if !loaded {
sql2 := fmt.Sprintf("SELECT version, state, version = '%s' FROM mo_catalog.mo_version where state >= 1 order by create_at desc limit 1", finalVersion)
res, err = txn.Exec(sql2, executor.StatementOption{})
if err != nil {
return "", false, err
}

res.ReadRows(func(rows int, cols []*vector.Vector) bool {
version = cols[0].GetStringAt(0)
isFinalVersion = vector.GetFixedAtWithTypeCheck[bool](cols[1], 0)
return true
})
res.Close()
}

if version == "" {
getLogger(txn.Txn().TxnOptions().CN).Fatal("BUG: Can't get crrent cluster version")
}
return version, isFinalVersion, nil
}

// ----------------------------------------------------------------------------------------------------------------------
func GeAccountVersion(accountId uint32, flag bool, txn executor.TxnExecutor) (string, int32, error) {
offsetCol := ""
if flag {
offsetCol = ", version_offset"
}
sql := fmt.Sprintf("SELECT create_version %s FROM mo_catalog.mo_account WHERE account_id = %d", offsetCol, accountId)
res, err := txn.Exec(sql, executor.StatementOption{})
if err != nil {
return "", 0, err
}
defer res.Close()

version := ""
versionOffset := int32(-1)
res.ReadRows(func(rows int, cols []*vector.Vector) bool {
version = cols[0].GetStringAt(0)
if flag {
versionOffset = int32(vector.GetFixedAtWithTypeCheck[uint32](cols[1], 0))
}
return true
})
return version, versionOffset, nil
}

//----------------------------------------------------------------------------------------------------------------------

func isConflictError(err error) bool {
return moerr.IsMoErrCode(err, moerr.ErrLockConflict)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/bootstrap/versions/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ package versions
import (
"testing"

"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"

"github.com/matrixorigin/matrixone/pkg/common/pubsub"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/prashantv/gostub"
"github.com/stretchr/testify/assert"
)

type MockTxnExecutor struct{}
Expand Down Expand Up @@ -60,6 +61,9 @@ func (MockTxnExecutor) Txn() client.TxnOperator {
panic("implement me")
}

func (MockTxnExecutor) SetCtxValue(key, value interface{}) {
}

func TestGetAllPubInfos(t *testing.T) {
stub := gostub.Stub(&CheckTableDefinition, func(_ executor.TxnExecutor, _ uint32, _ string, _ string) (bool, error) {
return true, nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/bootstrap/versions/v2_0_0/data_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func (MockTxnExecutor) Txn() client.TxnOperator {
panic("implement me")
}

func (MockTxnExecutor) SetCtxValue(key, value interface{}) {
}

func TestInsertInitDataKey(t *testing.T) {
txn := &MockTxnExecutor{}
err := InsertInitDataKey(txn, "01234567890123456789012345678901")
Expand Down
Loading
Loading