Skip to content

Commit ddedcac

Browse files
authored
refactor: prevent concurrent migration (#7311)
* refactor: prevent concurrent migration * fix: python test
1 parent 1ee6258 commit ddedcac

File tree

2 files changed

+33
-17
lines changed

2 files changed

+33
-17
lines changed

backend/server/api/api.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ WARNING: Performing migration may wipe collected data for consistency and re-col
4949
To proceed, please send a request to <config-ui-endpoint>/api/proceed-db-migration (or <devlake-endpoint>/proceed-db-migration).
5050
Alternatively, you may downgrade back to the previous DevLake version.
5151
`
52+
const DB_MIGRATING = `Database migration is in progress. Please wait until it is completed.`
5253

5354
var basicRes context.BasicRes
5455

@@ -95,12 +96,6 @@ func SetupApiServer(router *gin.Engine) {
9596

9697
// Endpoint to proceed database migration
9798
router.GET("/proceed-db-migration", func(ctx *gin.Context) {
98-
// Check if migration requires confirmation
99-
if !services.MigrationRequireConfirmation() {
100-
// Return success response
101-
shared.ApiOutputSuccess(ctx, nil, http.StatusOK)
102-
return
103-
}
10499
// Execute database migration
105100
err := services.ExecuteMigration()
106101
if err != nil {
@@ -114,15 +109,22 @@ func SetupApiServer(router *gin.Engine) {
114109

115110
// Restrict access if database migration is required
116111
router.Use(func(ctx *gin.Context) {
117-
if !services.MigrationRequireConfirmation() {
118-
return
112+
serviceStatus := services.CurrentStatus()
113+
if serviceStatus == services.SERVICE_STATUS_WAIT_CONFIRM {
114+
// Return error response
115+
shared.ApiOutputError(
116+
ctx,
117+
errors.HttpStatus(http.StatusPreconditionRequired).New(DB_MIGRATION_REQUIRED),
118+
)
119+
ctx.Abort()
120+
} else if serviceStatus == services.SERVICE_STATUS_MIGRATING {
121+
// Return error response
122+
shared.ApiOutputError(
123+
ctx,
124+
errors.HttpStatus(http.StatusPreconditionRequired).New(DB_MIGRATING),
125+
)
126+
ctx.Abort()
119127
}
120-
// Return error response
121-
shared.ApiOutputError(
122-
ctx,
123-
errors.HttpStatus(http.StatusPreconditionRequired).New(DB_MIGRATION_REQUIRED),
124-
)
125-
ctx.Abort()
126128
})
127129

128130
// Add swagger handlers

backend/server/services/init.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ limitations under the License.
1818
package services
1919

2020
import (
21+
"sync"
2122
"time"
2223

2324
"github.com/apache/incubator-devlake/core/config"
@@ -84,6 +85,7 @@ func GetMigrator() plugin.Migrator {
8485
}
8586

8687
// Init the services module
88+
// Should not be called concurrently
8789
func Init() {
8890
InitResources()
8991

@@ -115,9 +117,21 @@ func Init() {
115117
}
116118
}
117119

120+
var statusLock sync.Mutex
121+
118122
// ExecuteMigration executes all pending migration scripts and initialize services module
123+
// This might be called concurrently across multiple API requests
119124
func ExecuteMigration() errors.Error {
125+
statusLock.Lock()
126+
defer statusLock.Unlock()
127+
if serviceStatus == SERVICE_STATUS_MIGRATING {
128+
return errors.BadInput.New("already migrating")
129+
}
130+
if serviceStatus == SERVICE_STATUS_READY {
131+
return nil
132+
}
120133
serviceStatus = SERVICE_STATUS_MIGRATING
134+
statusLock.Unlock() // unlock to allow other API requests to check the status
121135
// apply all pending migration scripts
122136
err := migrator.Execute()
123137
if err != nil {
@@ -130,11 +144,11 @@ func ExecuteMigration() errors.Error {
130144

131145
// initialize pipeline server, mainly to start the pipeline consuming process
132146
pipelineServiceInit()
147+
statusLock.Lock()
133148
serviceStatus = SERVICE_STATUS_READY
134149
return nil
135150
}
136151

137-
// MigrationRequireConfirmation returns if there were migration scripts waiting to be executed
138-
func MigrationRequireConfirmation() bool {
139-
return migrator.HasPendingScripts()
152+
func CurrentStatus() string {
153+
return serviceStatus
140154
}

0 commit comments

Comments
 (0)