Skip to content

Distributed Table-Based Lock to support MySQL Database Cluster #9955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import com.cloud.utils.FileUtil;
import org.apache.cloudstack.utils.CloudStackVersion;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -443,6 +444,7 @@

@Override
public void check() {
initDistributedLock();

Check warning on line 447 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L447

Added line #L447 was not covered by tests
GlobalLock lock = GlobalLock.getInternLock("DatabaseUpgrade");
try {
LOGGER.info("Grabbing lock to check for database upgrade.");
Expand Down Expand Up @@ -486,6 +488,41 @@
}
}

private void initDistributedLock() {
LOGGER.info("Setting up distributed lock table if not created.");
TransactionLegacy txn = TransactionLegacy.open("initDistributedLock");
txn.start();
String errorMessage = "Unable to get the database connections";
try {
Connection conn = txn.getConnection();
errorMessage = "Unable to create distributed_lock table in the 'cloud' database ";
String sql = "CREATE TABLE IF NOT EXISTS `cloud`.`distributed_lock` (" +

Check warning on line 499 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L491-L499

Added lines #L491 - L499 were not covered by tests
" `name` varchar(1024) NOT NULL," +
" `thread` varchar(1024) NOT NULL," +
" `ms_id` bigint NOT NULL," +
" `pid` int NOT NULL," +
" `created` datetime DEFAULT NULL," +
" PRIMARY KEY (`name`)," +
" UNIQUE KEY `name` (`name`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.execute();

Check warning on line 509 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L508-L509

Added lines #L508 - L509 were not covered by tests
}
LOGGER.info("Deleting existing distributed locks with ms_id = " + ManagementServerNode.getManagementServerId());
try (PreparedStatement pstmt = conn.prepareStatement("DELETE FROM cloud.distributed_lock WHERE ms_id=?")) {
pstmt.setLong(1, ManagementServerNode.getManagementServerId());
pstmt.execute();

Check warning on line 514 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L511-L514

Added lines #L511 - L514 were not covered by tests
}
txn.commit();
} catch (CloudRuntimeException | SQLException e) {
LOGGER.error(e.getMessage());
errorMessage = String.format("%s due to %s.", errorMessage, e.getMessage());
throw new CloudRuntimeException(errorMessage, e);

Check warning on line 520 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L516-L520

Added lines #L516 - L520 were not covered by tests
} finally {
txn.close();

Check warning on line 522 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L522

Added line #L522 was not covered by tests
}
}

Check warning on line 524 in engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java

View check run for this annotation

Codecov / codecov/patch

engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java#L524

Added line #L524 was not covered by tests

private void initializeDatabaseEncryptors() {
TransactionLegacy txn = TransactionLegacy.open("initializeDatabaseEncryptors");
txn.start();
Expand Down
59 changes: 32 additions & 27 deletions framework/db/src/main/java/com/cloud/utils/db/DbUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.persistence.Table;
import javax.persistence.Transient;

import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

Expand Down Expand Up @@ -199,28 +200,36 @@
public static boolean getGlobalLock(String name, int timeoutSeconds) {
Connection conn = getConnectionForGlobalLocks(name, true);
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for global lock system");
LOGGER.error("Unable to acquire DB connection for distributed lock: " + name);

Check warning on line 203 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L203

Added line #L203 was not covered by tests
return false;
}

try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(GET_LOCK(?, ?),0)");) {
pstmt.setString(1, name);
pstmt.setInt(2, timeoutSeconds);

try (ResultSet rs = pstmt.executeQuery();) {
if (rs != null && rs.first()) {
if (rs.getInt(1) > 0) {
return true;
} else {
if (LOGGER.isDebugEnabled())
LOGGER.debug("GET_LOCK() timed out on lock : " + name);
}
int remainingTime = timeoutSeconds;
while (remainingTime > 0) {
try (PreparedStatement pstmt = conn.prepareStatement(
"INSERT INTO cloud.distributed_lock (name, thread, ms_id, pid, created) " +
"VALUES (?, ?, ?, ?, now()) ON DUPLICATE KEY UPDATE name=name")) {
pstmt.setString(1, name);
pstmt.setString(2, Thread.currentThread().getName());
pstmt.setLong(3, ManagementServerNode.getManagementServerId());
pstmt.setLong(4, ProcessHandle.current().pid());
if (pstmt.executeUpdate() > 0) {
return true;
}
} catch (SQLException e) {
LOGGER.error("Inserting to cloud.distributed_lock query threw exception ", e);
} catch (Throwable e) {
LOGGER.error("Inserting to cloud.distributed_lock query threw exception ", e);

Check warning on line 222 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L220-L222

Added lines #L220 - L222 were not covered by tests
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Waiting, cloud.distributed_lock already has the lock: " + name);

Check warning on line 226 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L226

Added line #L226 was not covered by tests
}
remainingTime = remainingTime - 1;
try {
Thread.sleep(1000L);
} catch (InterruptedException ignore) {

Check warning on line 231 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L231

Added line #L231 was not covered by tests
}
} catch (SQLException e) {
LOGGER.error("GET_LOCK() throws exception ", e);
} catch (Throwable e) {
LOGGER.error("GET_LOCK() throws exception ", e);
}

removeConnectionForGlobalLocks(name);
Expand All @@ -235,24 +244,20 @@
public static boolean releaseGlobalLock(String name) {
try (Connection conn = getConnectionForGlobalLocks(name, false);) {
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for global lock system");
LOGGER.error("Unable to acquire DB connection for distributed lock: " + name);

Check warning on line 247 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L247

Added line #L247 was not covered by tests
assert (false);
return false;
}

try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(RELEASE_LOCK(?), 0)");) {
try (PreparedStatement pstmt = conn.prepareStatement("DELETE FROM cloud.distributed_lock WHERE name=?")) {
pstmt.setString(1, name);
try (ResultSet rs = pstmt.executeQuery();) {
if (rs != null && rs.first()) {
return rs.getInt(1) > 0;
}
LOGGER.error("releaseGlobalLock:RELEASE_LOCK() returns unexpected result");
if (pstmt.executeUpdate() > 0) {
return true;
}
LOGGER.warn("releaseGlobalLock: failed to remove cloud.distributed_lock lock which does not exist: " + name);

Check warning on line 257 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L257

Added line #L257 was not covered by tests
}
} catch (SQLException e) {
LOGGER.error("RELEASE_LOCK() throws exception ", e);
} catch (Throwable e) {
LOGGER.error("RELEASE_LOCK() throws exception ", e);
LOGGER.error("Removing cloud.distributed_lock lock threw exception ", e);

Check warning on line 260 in framework/db/src/main/java/com/cloud/utils/db/DbUtil.java

View check run for this annotation

Codecov / codecov/patch

framework/db/src/main/java/com/cloud/utils/db/DbUtil.java#L260

Added line #L260 was not covered by tests
}
return false;
}
Expand Down
32 changes: 14 additions & 18 deletions framework/db/src/test/java/com/cloud/utils/DbUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,27 @@ public void cleanup() throws SecurityException, NoSuchFieldException, IllegalArg
@Test
public void getGlobalLock() throws SQLException {
Mockito.when(dataSource.getConnection()).thenReturn(connection);
Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(1);
Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeUpdate()).thenReturn(1);

Assert.assertTrue(DbUtil.getGlobalLock("TEST", 600));

Mockito.verify(connection).prepareStatement(ArgumentMatchers.anyString());
Mockito.verify(preparedStatement).close();
Mockito.verify(resultSet).close();
}

@Test
public void getGlobalLockTimeout() throws SQLException {
Mockito.when(dataSource.getConnection()).thenReturn(connection);
Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(0);
Assert.assertFalse(DbUtil.getGlobalLock("TEST", 600));

Mockito.verify(connection).prepareStatement(ArgumentMatchers.anyString());
Mockito.verify(preparedStatement).close();
Mockito.verify(resultSet).close();
Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeUpdate()).thenReturn(0);

final int tries = 2;
Assert.assertFalse(DbUtil.getGlobalLock("TEST", tries));

Mockito.verify(connection, Mockito.times(tries)).prepareStatement(Mockito.anyString());
Mockito.verify(preparedStatement, Mockito.times(tries)).close();
Mockito.verify(connection).close();

// if any error happens, the connection map must be cleared
Expand Down Expand Up @@ -237,14 +235,12 @@ void releaseGlobalLockNotexisting() throws SQLException {

@Test
public void releaseGlobalLock() throws SQLException {
Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(1);
Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeUpdate()).thenReturn(1);

connectionMap.put("testLock", connection);
Assert.assertTrue(DbUtil.releaseGlobalLock("testLock"));

Mockito.verify(resultSet).close();
Mockito.verify(preparedStatement).close();
Mockito.verify(connection).close();
Assert.assertFalse(connectionMap.containsKey("testLock"));
Expand Down
38 changes: 38 additions & 0 deletions server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import com.cloud.utils.db.GlobalLock;
import org.apache.cloudstack.api.ApiErrorCode;
import org.apache.cloudstack.api.ServerApiException;
import org.apache.cloudstack.api.command.admin.ca.IssueCertificateCmd;
Expand Down Expand Up @@ -295,6 +296,42 @@
/////////////// CA Manager Setup ///////////////////
////////////////////////////////////////////////////

public static final class DummyBGTask extends ManagedContextRunnable implements BackgroundPollTask {

public DummyBGTask() {
}

Check warning on line 302 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L301-L302

Added lines #L301 - L302 were not covered by tests

@Override
protected void runInContext() {
final GlobalLock lock = GlobalLock.getInternLock("DummyBGTask");
try {
logger.info("DummyBGTask: Trying to get lock at " + DateTime.now(DateTimeZone.UTC));

Check warning on line 308 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L305-L308

Added lines #L305 - L308 were not covered by tests
if (lock.lock(50)) {
try {
logger.info("DummyBGTask: Lock acquired, now sleeping for 5 seconds");
try {
Thread.sleep(10 * 1000L);
} catch (InterruptedException ignore) {
}

Check warning on line 315 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L310-L315

Added lines #L310 - L315 were not covered by tests
} finally {
logger.info("DummyBGTask: Unlocking now");
lock.unlock();
}

Check warning on line 319 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L317-L319

Added lines #L317 - L319 were not covered by tests
} else {
logger.info("DummyBGTask: Could not get lock, retrying...");

Check warning on line 321 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L321

Added line #L321 was not covered by tests
}
logger.info("DummyBGTask: One round of lock-grab over at " + DateTime.now(DateTimeZone.UTC));

Check warning on line 323 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L323

Added line #L323 was not covered by tests
} finally {
lock.releaseRef();

Check warning on line 325 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L325

Added line #L325 was not covered by tests
}
}

Check warning on line 327 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L327

Added line #L327 was not covered by tests

@Override
public Long getDelay() {
return 5000L;
}

Check warning on line 332 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L330-L332

Added lines #L330 - L332 were not covered by tests
}

public static final class CABackgroundTask extends ManagedContextRunnable implements BackgroundPollTask {
private CAManager caManager;
private HostDao hostDao;
Expand Down Expand Up @@ -405,6 +442,7 @@

@Override
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
backgroundPollManager.submitTask(new DummyBGTask());

Check warning on line 445 in server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java#L445

Added line #L445 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohityadavcloud
is it for testing only ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see the commit: d15c672

I wanted to create some code where operationally both devs & QA can see deadlocks etc via logs.

backgroundPollManager.submitTask(new CABackgroundTask(this, hostDao));
return true;
}
Expand Down
1 change: 1 addition & 0 deletions setup/db/create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ DROP TABLE IF EXISTS `cloud`.`image_data_store`;
DROP TABLE IF EXISTS `cloud`.`vm_compute_tags`;
DROP TABLE IF EXISTS `cloud`.`vm_root_disk_tags`;
DROP TABLE IF EXISTS `cloud`.`vm_network_map`;
DROP TABLE IF EXISTS `cloud`.`distributed_lock`;

CREATE TABLE `cloud`.`version` (
`id` bigint unsigned NOT NULL UNIQUE AUTO_INCREMENT COMMENT 'id',
Expand Down
Loading