Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,21 @@ class BuildRecordModelDao {
fun updateStatus(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
buildId: String,
buildStatus: BuildStatus,
executeCount: Int
executeCount: Int,
cancelUser: String? = null
) {
with(TPipelineBuildRecordModel.T_PIPELINE_BUILD_RECORD_MODEL) {
dslContext.update(this).set(STATUS, buildStatus.name)
.where(
PROJECT_ID.eq(projectId)
.and(BUILD_ID.eq(buildId))
.and(EXECUTE_COUNT.eq(executeCount))
).execute()
val update = dslContext.update(this).set(STATUS, buildStatus.name)
cancelUser?.let { update.set(CANCEL_USER, cancelUser) }
update.where(
PROJECT_ID.eq(projectId)
.and(BUILD_ID.eq(buildId))
.and(PIPELINE_ID.eq(pipelineId))
.and(EXECUTE_COUNT.eq(executeCount))
).execute()
}
}

Expand All @@ -139,6 +143,30 @@ class BuildRecordModelDao {
}
}

fun getBuildStatusAndUsers(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
buildId: String,
executeCount: Int
): Triple<BuildStatus, String/*startUser*/, String?/*cancelUser*/>? {
with(TPipelineBuildRecordModel.T_PIPELINE_BUILD_RECORD_MODEL) {
return dslContext.select(STATUS, START_USER, CANCEL_USER).from(this)
.where(
BUILD_ID.eq(buildId)
.and(PROJECT_ID.eq(projectId))
.and(PIPELINE_ID.eq(pipelineId))
.and(EXECUTE_COUNT.eq(executeCount))
).fetchAny() ?.let {
Triple(
BuildStatus.valueOf(it.component1()),
it.component2(),
it.component3()
)
}
}
}

fun getRecordInfoList(
dslContext: DSLContext,
projectId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,100 @@ package com.tencent.devops.process.engine.control.lock
import com.tencent.devops.common.redis.RedisLock
import com.tencent.devops.common.redis.RedisOperation

class PipelineBuildRecordLock(redisOperation: RedisOperation, buildId: String, executeCount: Int) :
RedisLock(
redisOperation = redisOperation,
lockKey = "process.build.record.lock.$buildId.$executeCount",
expiredTimeInSeconds = 10L
) {
/**
* 构建记录锁抽象基类
* 统一管理公共字段和行为,减少重复代码
*/
abstract class AbstractBuildRecordLock(
redisOperation: RedisOperation,
buildId: String,
executeCount: Int,
lockKeySuffix: String = ""
) : RedisLock(
redisOperation = redisOperation,
lockKey = buildLockKey(buildId, executeCount, lockKeySuffix),
expiredTimeInSeconds = 10L
) {

override fun decorateKey(key: String): String {
// buildId,key无需加上集群信息前缀来区分
return key
}

companion object {
private const val LOCK_KEY_PREFIX = "process.build.record.lock"

/**
* 构建锁的key
* @param buildId 构建ID
* @param executeCount 执行次数
* @param suffix 额外的后缀(如stageId、containerId、taskId)
*/
private fun buildLockKey(buildId: String, executeCount: Int, suffix: String): String {
return if (suffix.isBlank()) {
"$LOCK_KEY_PREFIX.$buildId.$executeCount"
} else {
"$LOCK_KEY_PREFIX.$buildId.$suffix.$executeCount"
}
}
}
}

/**
* Pipeline级别的构建记录锁 - 暂时无用BuildIdLock功能冲突
*/
class PipelineBuildRecordLock(
redisOperation: RedisOperation,
buildId: String,
executeCount: Int
) : AbstractBuildRecordLock(
redisOperation = redisOperation,
buildId = buildId,
executeCount = executeCount,
lockKeySuffix = ""
)

/**
* Stage级别的构建记录锁
*/
class StageBuildRecordLock(
redisOperation: RedisOperation,
buildId: String,
stageId: String,
executeCount: Int
) : AbstractBuildRecordLock(
redisOperation = redisOperation,
buildId = buildId,
executeCount = executeCount,
lockKeySuffix = stageId
)

/**
* Container级别的构建记录锁
*/
class ContainerBuildRecordLock(
redisOperation: RedisOperation,
buildId: String,
containerId: String,
executeCount: Int
) : AbstractBuildRecordLock(
redisOperation = redisOperation,
buildId = buildId,
executeCount = executeCount,
lockKeySuffix = containerId
)

/**
* Task级别的构建记录锁
*/
class TaskBuildRecordLock(
redisOperation: RedisOperation,
buildId: String,
taskId: String,
executeCount: Int
) : AbstractBuildRecordLock(
redisOperation = redisOperation,
buildId = buildId,
executeCount = executeCount,
lockKeySuffix = taskId
)
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,7 @@ class PipelineRuntimeService @Autowired constructor(
recordModelDao.updateStatus(
dslContext = transactionContext,
projectId = buildInfo.projectId,
pipelineId = buildInfo.pipelineId,
buildId = buildInfo.buildId,
buildStatus = newBuildStatus,
executeCount = executeCount
Expand Down Expand Up @@ -1820,6 +1821,7 @@ class PipelineRuntimeService @Autowired constructor(
recordModelDao.updateStatus(
dslContext = transactionContext,
projectId = latestRunningBuild.projectId,
pipelineId = latestRunningBuild.pipelineId,
buildId = latestRunningBuild.buildId,
executeCount = latestRunningBuild.executeCount,
buildStatus = BuildStatus.RUNNING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import com.tencent.devops.common.web.utils.I18nUtil
import com.tencent.devops.common.websocket.enum.RefreshType
import com.tencent.devops.process.constant.ProcessMessageCode
import com.tencent.devops.process.dao.record.BuildRecordModelDao
import com.tencent.devops.process.engine.control.lock.PipelineBuildRecordLock
import com.tencent.devops.process.engine.control.lock.AbstractBuildRecordLock
import com.tencent.devops.process.engine.dao.PipelineBuildDao
import com.tencent.devops.process.engine.dao.PipelineResourceDao
import com.tencent.devops.process.engine.dao.PipelineResourceVersionDao
Expand All @@ -77,7 +77,7 @@ open class BaseBuildRecordService(
private val buildRecordModelDao: BuildRecordModelDao,
val pipelineBuildDao: PipelineBuildDao,
private val pipelineEventDispatcher: PipelineEventDispatcher,
private val redisOperation: RedisOperation,
val redisOperation: RedisOperation,
private val stageTagService: StageTagService,
private val recordModelService: PipelineRecordModelService,
private val pipelineResourceDao: PipelineResourceDao,
Expand All @@ -92,57 +92,51 @@ open class BaseBuildRecordService(
buildStatus: BuildStatus,
cancelUser: String? = null,
operation: String = "",
lock: AbstractBuildRecordLock,
refreshOperation: () -> Unit
) {
val watcher = Watcher(id = "updateRecord#$buildId#$operation")
var message = "nothing"
val lock = PipelineBuildRecordLock(redisOperation, buildId, executeCount)
var startUser: String? = null
try {
watcher.start("lock")
lock.lock()

watcher.start("getRecord")
val record = buildRecordModelDao.getRecord(
val (oldStatus, recordStartUser, recordCancelUser) = buildRecordModelDao.getBuildStatusAndUsers(
dslContext = dslContext, projectId = projectId, pipelineId = pipelineId,
buildId = buildId, executeCount = executeCount
) ?: run {
message = "Model record is empty"
return
}
startUser = record.startUser

watcher.start("refreshOperation")
refreshOperation()
watcher.stop()
watcher.start("lock")
lock.use {
watcher.start("refreshOperation")
refreshOperation()
}

watcher.start("updatePipelineRecord")
val (change, finalStatus) = takeBuildStatus(record, buildStatus)
watcher.start("updateModelRecord")
startUser = recordStartUser
val (change, finalStatus) = takeBuildStatus(oldStatus, buildStatus)
if (!change && cancelUser.isNullOrBlank()) {
message = "Build status did not change"
return
}
buildRecordModelDao.updateRecord(
buildRecordModelDao.updateStatus(
dslContext = dslContext,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildRecordModelDao.updateStatus没有锁保护,可能会有线程安全问题!updateStatus这个方法估计要放入锁内部执行,另外refreshOperation的db操作和updateStatus(要是cas更新)要放在同一个事务执行;或者通过队列保证顺序执行

projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
executeCount = executeCount,
buildStatus = finalStatus,
modelVar = record.modelVar, // 暂时没有变量,保留修改可能
startTime = null,
endTime = null,
errorInfoList = null,
cancelUser = cancelUser // 系统行为导致的取消状态(仅当在取消状态时,还没有设置过取消人,才默认为System)
?: if (buildStatus.isCancel() && record.cancelUser.isNullOrBlank()) "System" else null,
timestamps = null
?: if (buildStatus.isCancel() && recordCancelUser.isNullOrBlank()) "System" else null
)
message = "Will not update"
} catch (ignored: Throwable) {
message = ignored.message ?: ""
logger.warn("[$buildId]| Fail to update the build record: ${ignored.message}", ignored)
} finally {
lock.unlock()
logger.info("[$buildId|$buildStatus]|$operation|update_detail_record| $message")
watcher.start("dispatchEvent")
pipelineRecordChangeEvent(projectId, pipelineId, buildId, startUser, executeCount)
Expand Down Expand Up @@ -290,10 +284,9 @@ open class BaseBuildRecordService(
}

private fun takeBuildStatus(
record: BuildRecordModel,
oldStatus: BuildStatus,
buildStatus: BuildStatus
): Pair<Boolean, BuildStatus> {
val oldStatus = BuildStatus.parse(record.status)
return if (!oldStatus.isFinish()) {
(oldStatus != buildStatus) to buildStatus
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import com.tencent.devops.process.dao.record.BuildRecordModelDao
import com.tencent.devops.process.dao.record.BuildRecordTaskDao
import com.tencent.devops.process.engine.common.BuildTimeCostUtils.generateContainerTimeCost
import com.tencent.devops.process.engine.common.BuildTimeCostUtils.generateMatrixTimeCost
import com.tencent.devops.process.engine.control.lock.ContainerBuildRecordLock
import com.tencent.devops.process.engine.dao.PipelineBuildDao
import com.tencent.devops.process.engine.dao.PipelineResourceDao
import com.tencent.devops.process.engine.dao.PipelineResourceVersionDao
Expand Down Expand Up @@ -158,7 +159,8 @@ class ContainerBuildRecordService(
containerBuildDetailService.containerPreparing(projectId, buildId, containerId)
update(
projectId, pipelineId, buildId, executeCount, BuildStatus.RUNNING,
cancelUser = null, operation = "containerPreparing#$containerId"
cancelUser = null, operation = "containerPreparing#$containerId",
lock = ContainerBuildRecordLock(redisOperation, buildId, containerId, executeCount)
) {
updateContainerRecord(
projectId = projectId, pipelineId = pipelineId, buildId = buildId,
Expand Down Expand Up @@ -191,7 +193,8 @@ class ContainerBuildRecordService(
)
update(
projectId, pipelineId, buildId, executeCount, BuildStatus.RUNNING,
cancelUser = null, operation = "containerStarted#$containerId"
cancelUser = null, operation = "containerStarted#$containerId",
lock = ContainerBuildRecordLock(redisOperation, buildId, containerId, executeCount)
) {
updateContainerRecord(
projectId = projectId, pipelineId = pipelineId, buildId = buildId,
Expand Down Expand Up @@ -231,7 +234,8 @@ class ContainerBuildRecordService(
)
update(
projectId, pipelineId, buildId, executeCount, BuildStatus.RUNNING,
cancelUser = null, operation = "$operation#$containerId"
cancelUser = null, operation = "$operation#$containerId",
lock = ContainerBuildRecordLock(redisOperation, buildId, containerId, executeCount)
) {
dslContext.transaction { configuration ->
val context = DSL.using(configuration)
Expand Down Expand Up @@ -338,7 +342,8 @@ class ContainerBuildRecordService(
)
update(
projectId, pipelineId, buildId, executeCount, BuildStatus.RUNNING,
cancelUser = null, operation = "updateMatrixGroupContainer#$matrixGroupId"
cancelUser = null, operation = "updateMatrixGroupContainer#$matrixGroupId",
lock = ContainerBuildRecordLock(redisOperation, buildId, matrixGroupId, executeCount)
) {
logger.info(
"[$buildId]|matrix_group_record|j(${modelContainer?.containerId})|" +
Expand All @@ -365,7 +370,8 @@ class ContainerBuildRecordService(
containerBuildDetailService.containerSkip(projectId, buildId, containerId)
update(
projectId, pipelineId, buildId, executeCount, BuildStatus.RUNNING,
cancelUser = null, operation = "containerSkip#$containerId"
cancelUser = null, operation = "containerSkip#$containerId",
lock = ContainerBuildRecordLock(redisOperation, buildId, containerId, executeCount)
) {
logger.info("[$buildId]|container_skip|j($containerId)")
recordTaskDao.updateRecordStatus(
Expand Down Expand Up @@ -401,7 +407,8 @@ class ContainerBuildRecordService(
if (executeCount == null) return
update(
projectId, pipelineId, buildId, executeCount, BuildStatus.RUNNING,
cancelUser = null, operation = "saveBuildVmInfo($projectId,$pipelineId)"
cancelUser = null, operation = "saveBuildVmInfo($projectId,$pipelineId)",
lock = ContainerBuildRecordLock(redisOperation, buildId, containerId, executeCount)
) {
dslContext.transaction { configuration ->
val context = DSL.using(configuration)
Expand Down
Loading