Skip to content

Distributed Table-Based Lock to support MySQL Database Cluster #9955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import com.cloud.utils.FileUtil;
import org.apache.cloudstack.utils.CloudStackVersion;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -443,6 +444,7 @@

@Override
public void check() {
initDistributedLock();

Check warning on line 447 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L447

Added line #L447 was not covered by tests
GlobalLock lock = GlobalLock.getInternLock("DatabaseUpgrade");
try {
LOGGER.info("Grabbing lock to check for database upgrade.");
Expand Down Expand Up @@ -486,6 +488,39 @@
}
}

private void initDistributedLock() {
LOGGER.info("Setting up distributed lock table if not created.");
TransactionLegacy txn = TransactionLegacy.open("initDistributedLock");
txn.start();
String errorMessage = "Unable to get the database connections";
try {
Connection conn = txn.getConnection();
errorMessage = "Unable to create distributed_lock table in the 'cloud' database ";
String sql = "CREATE TABLE IF NOT EXISTS `cloud`.`distributed_lock` (" +

Check warning on line 499 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L491-L499

Added lines #L491 - L499 were not covered by tests
" `name` varchar(1024) NOT NULL," +
" `thread` varchar(1024) NOT NULL," +
" `ms_id` bigint NOT NULL, `pid` int NOT NULL," +
" `created` datetime DEFAULT NULL," +
" PRIMARY KEY (`name`)," +
" UNIQUE KEY `name` (`name`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.execute();

Check warning on line 508 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L507-L508

Added lines #L507 - L508 were not covered by tests
}
try (PreparedStatement pstmt = conn.prepareStatement("DELETE FROM cloud.distributed_lock WHERE ms_id=?")) {
pstmt.setLong(1, ManagementServerNode.getManagementServerId());
pstmt.execute();

Check warning on line 512 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L510-L512

Added lines #L510 - L512 were not covered by tests
}
txn.commit();
} catch (CloudRuntimeException | SQLException e) {
LOGGER.error(e.getMessage());
errorMessage = String.format("%s due to %s.", errorMessage, e.getMessage());
throw new CloudRuntimeException(errorMessage, e);

Check warning on line 518 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L514-L518

Added lines #L514 - L518 were not covered by tests
} finally {
txn.close();

Check warning on line 520 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L520

Added line #L520 was not covered by tests
}
}

Check warning on line 522 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L522

Added line #L522 was not covered by tests

private void initializeDatabaseEncryptors() {
TransactionLegacy txn = TransactionLegacy.open("initializeDatabaseEncryptors");
txn.start();
Expand Down
59 changes: 32 additions & 27 deletions framework/db/src/main/java/com/cloud/utils/db/DbUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.persistence.Table;
import javax.persistence.Transient;

import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

Expand Down Expand Up @@ -199,28 +200,36 @@
public static boolean getGlobalLock(String name, int timeoutSeconds) {
Connection conn = getConnectionForGlobalLocks(name, true);
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for global lock system");
LOGGER.error("Unable to acquire DB connection for distributed lock: " + name);

Check warning on line 203 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L203

Added line #L203 was not covered by tests
return false;
}

try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(GET_LOCK(?, ?),0)");) {
pstmt.setString(1, name);
pstmt.setInt(2, timeoutSeconds);

try (ResultSet rs = pstmt.executeQuery();) {
if (rs != null && rs.first()) {
if (rs.getInt(1) > 0) {
return true;
} else {
if (LOGGER.isDebugEnabled())
LOGGER.debug("GET_LOCK() timed out on lock : " + name);
}
int remainingTime = timeoutSeconds;
while (remainingTime > 0) {
try (PreparedStatement pstmt = conn.prepareStatement(
"INSERT INTO cloud.distributed_lock (name, thread, ms_id, pid, created) " +
"VALUES (?, ?, ?, ?, now()) ON DUPLICATE KEY UPDATE name=name")) {
pstmt.setString(1, name);
pstmt.setString(2, Thread.currentThread().getName());
pstmt.setLong(3, ManagementServerNode.getManagementServerId());
pstmt.setLong(4, ProcessHandle.current().pid());
if (pstmt.executeUpdate() > 0) {
return true;
}
} catch (SQLException e) {
LOGGER.error("Inserting to cloud.distributed_lock query threw exception ", e);
} catch (Throwable e) {
LOGGER.error("Inserting to cloud.distributed_lock query threw exception ", e);

Check warning on line 222 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L220-L222

Added lines #L220 - L222 were not covered by tests
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Waiting, cloud.distributed_lock already has the lock: " + name);

Check warning on line 226 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L226

Added line #L226 was not covered by tests
}
remainingTime = remainingTime - 1;
try {
Thread.sleep(1000L);
} catch (InterruptedException ignore) {

Check warning on line 231 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L231

Added line #L231 was not covered by tests
}
} catch (SQLException e) {
LOGGER.error("GET_LOCK() throws exception ", e);
} catch (Throwable e) {
LOGGER.error("GET_LOCK() throws exception ", e);
}

removeConnectionForGlobalLocks(name);
Expand All @@ -235,24 +244,20 @@
public static boolean releaseGlobalLock(String name) {
try (Connection conn = getConnectionForGlobalLocks(name, false);) {
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for global lock system");
LOGGER.error("Unable to acquire DB connection for distributed lock: " + name);

Check warning on line 247 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L247

Added line #L247 was not covered by tests
assert (false);
return false;
}

try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(RELEASE_LOCK(?), 0)");) {
try (PreparedStatement pstmt = conn.prepareStatement("DELETE FROM cloud.distributed_lock WHERE name=?")) {
pstmt.setString(1, name);
try (ResultSet rs = pstmt.executeQuery();) {
if (rs != null && rs.first()) {
return rs.getInt(1) > 0;
}
LOGGER.error("releaseGlobalLock:RELEASE_LOCK() returns unexpected result");
if (pstmt.executeUpdate() > 0) {
return true;
}
LOGGER.warn("releaseGlobalLock: failed to remove cloud.distributed_lock lock which does not exist: " + name);

Check warning on line 257 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L257

Added line #L257 was not covered by tests
}
} catch (SQLException e) {
LOGGER.error("RELEASE_LOCK() throws exception ", e);
} catch (Throwable e) {
LOGGER.error("RELEASE_LOCK() throws exception ", e);
LOGGER.error("Removing cloud.distributed_lock lock threw exception ", e);

Check warning on line 260 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L260

Added line #L260 was not covered by tests
}
return false;
}
Expand Down
32 changes: 14 additions & 18 deletions framework/db/src/test/java/com/cloud/utils/DbUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,27 @@ public void cleanup() throws SecurityException, NoSuchFieldException, IllegalArg
@Test
public void getGlobalLock() throws SQLException {
Mockito.when(dataSource.getConnection()).thenReturn(connection);
Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(1);
Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeUpdate()).thenReturn(1);

Assert.assertTrue(DbUtil.getGlobalLock("TEST", 600));

Mockito.verify(connection).prepareStatement(ArgumentMatchers.anyString());
Mockito.verify(preparedStatement).close();
Mockito.verify(resultSet).close();
}

@Test
public void getGlobalLockTimeout() throws SQLException {
Mockito.when(dataSource.getConnection()).thenReturn(connection);
Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(0);
Assert.assertFalse(DbUtil.getGlobalLock("TEST", 600));

Mockito.verify(connection).prepareStatement(ArgumentMatchers.anyString());
Mockito.verify(preparedStatement).close();
Mockito.verify(resultSet).close();
Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeUpdate()).thenReturn(0);

final int tries = 2;
Assert.assertFalse(DbUtil.getGlobalLock("TEST", tries));

Mockito.verify(connection, Mockito.times(tries)).prepareStatement(Mockito.anyString());
Mockito.verify(preparedStatement, Mockito.times(tries)).close();
Mockito.verify(connection).close();

// if any error happens, the connection map must be cleared
Expand Down Expand Up @@ -237,14 +235,12 @@ void releaseGlobalLockNotexisting() throws SQLException {

@Test
public void releaseGlobalLock() throws SQLException {
Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(1);
Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeUpdate()).thenReturn(1);

connectionMap.put("testLock", connection);
Assert.assertTrue(DbUtil.releaseGlobalLock("testLock"));

Mockito.verify(resultSet).close();
Mockito.verify(preparedStatement).close();
Mockito.verify(connection).close();
Assert.assertFalse(connectionMap.containsKey("testLock"));
Expand Down
1 change: 1 addition & 0 deletions setup/db/create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ DROP TABLE IF EXISTS `cloud`.`image_data_store`;
DROP TABLE IF EXISTS `cloud`.`vm_compute_tags`;
DROP TABLE IF EXISTS `cloud`.`vm_root_disk_tags`;
DROP TABLE IF EXISTS `cloud`.`vm_network_map`;
DROP TABLE IF EXISTS `cloud`.`distributed_lock`;

CREATE TABLE `cloud`.`version` (
`id` bigint unsigned NOT NULL UNIQUE AUTO_INCREMENT COMMENT 'id',
Expand Down
Loading