diff --git a/engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java b/engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java index 1e3b3a7e5ece..f3feb156891b 100644 --- a/engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java +++ b/engine/schema/src/main/java/com/cloud/upgrade/DatabaseUpgradeChecker.java @@ -35,6 +35,7 @@ import com.cloud.utils.FileUtil; import org.apache.cloudstack.utils.CloudStackVersion; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -443,6 +444,7 @@ protected void executeViewScripts() { @Override public void check() { + initDistributedLock(); GlobalLock lock = GlobalLock.getInternLock("DatabaseUpgrade"); try { LOGGER.info("Grabbing lock to check for database upgrade."); @@ -486,6 +488,41 @@ public void check() { } } + private void initDistributedLock() { + LOGGER.info("Setting up distributed lock table if not created."); + TransactionLegacy txn = TransactionLegacy.open("initDistributedLock"); + txn.start(); + String errorMessage = "Unable to get the database connections"; + try { + Connection conn = txn.getConnection(); + errorMessage = "Unable to create distributed_lock table in the 'cloud' database "; + String sql = "CREATE TABLE IF NOT EXISTS `cloud`.`distributed_lock` (" + + " `name` varchar(1024) NOT NULL," + + " `thread` varchar(1024) NOT NULL," + + " `ms_id` bigint NOT NULL," + + " `pid` int NOT NULL," + + " `created` datetime DEFAULT NULL," + + " PRIMARY KEY (`name`)," + + " UNIQUE KEY `name` (`name`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8"; + try (PreparedStatement pstmt = conn.prepareStatement(sql)) { + pstmt.execute(); + } + LOGGER.info("Deleting existing distributed locks with ms_id = " + ManagementServerNode.getManagementServerId()); + try (PreparedStatement pstmt = conn.prepareStatement("DELETE FROM cloud.distributed_lock WHERE ms_id=?")) { + pstmt.setLong(1, ManagementServerNode.getManagementServerId()); + pstmt.execute(); + } + txn.commit(); + } catch (CloudRuntimeException | SQLException e) { + LOGGER.error(e.getMessage()); + errorMessage = String.format("%s due to %s.", errorMessage, e.getMessage()); + throw new CloudRuntimeException(errorMessage, e); + } finally { + txn.close(); + } + } + private void initializeDatabaseEncryptors() { TransactionLegacy txn = TransactionLegacy.open("initializeDatabaseEncryptors"); txn.start(); diff --git a/framework/db/src/main/java/com/cloud/utils/db/DbUtil.java b/framework/db/src/main/java/com/cloud/utils/db/DbUtil.java index 88397f54d4f4..cfdc5b55d02b 100644 --- a/framework/db/src/main/java/com/cloud/utils/db/DbUtil.java +++ b/framework/db/src/main/java/com/cloud/utils/db/DbUtil.java @@ -41,6 +41,7 @@ import javax.persistence.Table; import javax.persistence.Transient; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -199,28 +200,36 @@ public static final String getTableName(Class clazz) { public static boolean getGlobalLock(String name, int timeoutSeconds) { Connection conn = getConnectionForGlobalLocks(name, true); if (conn == null) { - LOGGER.error("Unable to acquire DB connection for global lock system"); + LOGGER.error("Unable to acquire DB connection for distributed lock: " + name); return false; } - try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(GET_LOCK(?, ?),0)");) { - pstmt.setString(1, name); - pstmt.setInt(2, timeoutSeconds); - - try (ResultSet rs = pstmt.executeQuery();) { - if (rs != null && rs.first()) { - if (rs.getInt(1) > 0) { - return true; - } else { - if (LOGGER.isDebugEnabled()) - LOGGER.debug("GET_LOCK() timed out on lock : " + name); - } + int remainingTime = timeoutSeconds; + while (remainingTime > 0) { + try (PreparedStatement pstmt = conn.prepareStatement( + "INSERT INTO cloud.distributed_lock (name, thread, ms_id, pid, created) " + + "VALUES (?, ?, ?, ?, now()) ON DUPLICATE KEY UPDATE name=name")) { + pstmt.setString(1, name); + pstmt.setString(2, Thread.currentThread().getName()); + pstmt.setLong(3, ManagementServerNode.getManagementServerId()); + pstmt.setLong(4, ProcessHandle.current().pid()); + if (pstmt.executeUpdate() > 0) { + return true; } + } catch (SQLException e) { + LOGGER.error("Inserting to cloud.distributed_lock query threw exception ", e); + } catch (Throwable e) { + LOGGER.error("Inserting to cloud.distributed_lock query threw exception ", e); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Waiting, cloud.distributed_lock already has the lock: " + name); + } + remainingTime = remainingTime - 1; + try { + Thread.sleep(1000L); + } catch (InterruptedException ignore) { } - } catch (SQLException e) { - LOGGER.error("GET_LOCK() throws exception ", e); - } catch (Throwable e) { - LOGGER.error("GET_LOCK() throws exception ", e); } removeConnectionForGlobalLocks(name); @@ -235,24 +244,20 @@ public static Class getEntityBeanType(GenericDao dao) { public static boolean releaseGlobalLock(String name) { try (Connection conn = getConnectionForGlobalLocks(name, false);) { if (conn == null) { - LOGGER.error("Unable to acquire DB connection for global lock system"); + LOGGER.error("Unable to acquire DB connection for distributed lock: " + name); assert (false); return false; } - try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(RELEASE_LOCK(?), 0)");) { + try (PreparedStatement pstmt = conn.prepareStatement("DELETE FROM cloud.distributed_lock WHERE name=?")) { pstmt.setString(1, name); - try (ResultSet rs = pstmt.executeQuery();) { - if (rs != null && rs.first()) { - return rs.getInt(1) > 0; - } - LOGGER.error("releaseGlobalLock:RELEASE_LOCK() returns unexpected result"); + if (pstmt.executeUpdate() > 0) { + return true; } + LOGGER.warn("releaseGlobalLock: failed to remove cloud.distributed_lock lock which does not exist: " + name); } - } catch (SQLException e) { - LOGGER.error("RELEASE_LOCK() throws exception ", e); } catch (Throwable e) { - LOGGER.error("RELEASE_LOCK() throws exception ", e); + LOGGER.error("Removing cloud.distributed_lock lock threw exception ", e); } return false; } diff --git a/framework/db/src/test/java/com/cloud/utils/DbUtilTest.java b/framework/db/src/test/java/com/cloud/utils/DbUtilTest.java index 7ae7368e173e..ce9f1269044d 100644 --- a/framework/db/src/test/java/com/cloud/utils/DbUtilTest.java +++ b/framework/db/src/test/java/com/cloud/utils/DbUtilTest.java @@ -150,29 +150,27 @@ public void cleanup() throws SecurityException, NoSuchFieldException, IllegalArg @Test public void getGlobalLock() throws SQLException { Mockito.when(dataSource.getConnection()).thenReturn(connection); - Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement); - Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet); - Mockito.when(resultSet.first()).thenReturn(true); - Mockito.when(resultSet.getInt(1)).thenReturn(1); + Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement); + Mockito.when(preparedStatement.executeUpdate()).thenReturn(1); + Assert.assertTrue(DbUtil.getGlobalLock("TEST", 600)); Mockito.verify(connection).prepareStatement(ArgumentMatchers.anyString()); Mockito.verify(preparedStatement).close(); - Mockito.verify(resultSet).close(); } @Test public void getGlobalLockTimeout() throws SQLException { Mockito.when(dataSource.getConnection()).thenReturn(connection); - Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement); - Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet); - Mockito.when(resultSet.first()).thenReturn(true); - Mockito.when(resultSet.getInt(1)).thenReturn(0); - Assert.assertFalse(DbUtil.getGlobalLock("TEST", 600)); - Mockito.verify(connection).prepareStatement(ArgumentMatchers.anyString()); - Mockito.verify(preparedStatement).close(); - Mockito.verify(resultSet).close(); + Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement); + Mockito.when(preparedStatement.executeUpdate()).thenReturn(0); + + final int tries = 2; + Assert.assertFalse(DbUtil.getGlobalLock("TEST", tries)); + + Mockito.verify(connection, Mockito.times(tries)).prepareStatement(Mockito.anyString()); + Mockito.verify(preparedStatement, Mockito.times(tries)).close(); Mockito.verify(connection).close(); // if any error happens, the connection map must be cleared @@ -237,14 +235,12 @@ void releaseGlobalLockNotexisting() throws SQLException { @Test public void releaseGlobalLock() throws SQLException { - Mockito.when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(preparedStatement); - Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet); - Mockito.when(resultSet.first()).thenReturn(true); - Mockito.when(resultSet.getInt(1)).thenReturn(1); + Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(preparedStatement); + Mockito.when(preparedStatement.executeUpdate()).thenReturn(1); + connectionMap.put("testLock", connection); Assert.assertTrue(DbUtil.releaseGlobalLock("testLock")); - Mockito.verify(resultSet).close(); Mockito.verify(preparedStatement).close(); Mockito.verify(connection).close(); Assert.assertFalse(connectionMap.containsKey("testLock")); diff --git a/server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java b/server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java index 22f8939e7eb7..d494d0b22c7f 100644 --- a/server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java +++ b/server/src/main/java/org/apache/cloudstack/ca/CAManagerImpl.java @@ -40,6 +40,7 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; +import com.cloud.utils.db.GlobalLock; import org.apache.cloudstack.api.ApiErrorCode; import org.apache.cloudstack.api.ServerApiException; import org.apache.cloudstack.api.command.admin.ca.IssueCertificateCmd; @@ -295,6 +296,42 @@ public char[] getKeyStorePassphrase() { /////////////// CA Manager Setup /////////////////// //////////////////////////////////////////////////// + public static final class DummyBGTask extends ManagedContextRunnable implements BackgroundPollTask { + + public DummyBGTask() { + } + + @Override + protected void runInContext() { + final GlobalLock lock = GlobalLock.getInternLock("DummyBGTask"); + try { + logger.info("DummyBGTask: Trying to get lock at " + DateTime.now(DateTimeZone.UTC)); + if (lock.lock(50)) { + try { + logger.info("DummyBGTask: Lock acquired, now sleeping for 5 seconds"); + try { + Thread.sleep(10 * 1000L); + } catch (InterruptedException ignore) { + } + } finally { + logger.info("DummyBGTask: Unlocking now"); + lock.unlock(); + } + } else { + logger.info("DummyBGTask: Could not get lock, retrying..."); + } + logger.info("DummyBGTask: One round of lock-grab over at " + DateTime.now(DateTimeZone.UTC)); + } finally { + lock.releaseRef(); + } + } + + @Override + public Long getDelay() { + return 5000L; + } + } + public static final class CABackgroundTask extends ManagedContextRunnable implements BackgroundPollTask { private CAManager caManager; private HostDao hostDao; @@ -405,6 +442,7 @@ public boolean start() { @Override public boolean configure(final String name, final Map params) throws ConfigurationException { + backgroundPollManager.submitTask(new DummyBGTask()); backgroundPollManager.submitTask(new CABackgroundTask(this, hostDao)); return true; } diff --git a/setup/db/create-schema.sql b/setup/db/create-schema.sql index 3f14fccd0103..9b31bbe6a037 100755 --- a/setup/db/create-schema.sql +++ b/setup/db/create-schema.sql @@ -195,6 +195,7 @@ DROP TABLE IF EXISTS `cloud`.`image_data_store`; DROP TABLE IF EXISTS `cloud`.`vm_compute_tags`; DROP TABLE IF EXISTS `cloud`.`vm_root_disk_tags`; DROP TABLE IF EXISTS `cloud`.`vm_network_map`; +DROP TABLE IF EXISTS `cloud`.`distributed_lock`; CREATE TABLE `cloud`.`version` ( `id` bigint unsigned NOT NULL UNIQUE AUTO_INCREMENT COMMENT 'id',