Skip to content

Improve health check resilience #1619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public final class Messages {
public static final String ENV_OF_APP_0_CONTAINS_INVALID_VALUE_FOR_1 = "The environment of application \"{0}\" contains an invalid value for \"{0}\". This indicates that MTA reserved variables in the application''s environment were modified manually. Either revert the changes or delete the application.";
public static final String METADATA_OF_0_CONTAINS_INVALID_VALUE_FOR_1 = "The metadata of \"{0}\" contains an invalid value for \"{0}\". This indicates that MTA reserved variables in the entity''s metadata were modified manually. Either revert the changes or delete the entity.";
public static final String COULD_NOT_DELETE_SPACEIDS_LEFTOVERS = "Could not delete space ids leftovers";
public static final String TIMEOUT_WHILE_CHECKING_DATABASE_HEALTH = "Timeout while checking database health";
public static final String TIMEOUT_WHILE_CHECKING_FOR_INCREASED_LOCKS = "Timeout while checking for increased locks";
public static final String TIMEOUT_WHILE_CHECKING_OBJECT_STORE_HEALTH = "Timeout while checking object store health";

public static final String COULD_NOT_GET_FILE_CONTENT_FOR_0 = "Could not get file content for file \"{0}\"";
public static final String SERVICE_MISSING_REQUIRED_PARAMETER = "Service \"{0}\" has missing required parameter: {1}";
Expand Down Expand Up @@ -199,6 +202,9 @@ public final class Messages {
public static final String DECREASING_INDEX_0_1_2 = "Decreasing index: {0} / {1} = {2}";
public static final String OBJECT_STORE_FILE_STORAGE_IS_NOT_AVAILABLE_FOR_INSTANCE = "Object store file storage is not available for instance: \"{0}\"";
public static final String NOT_ENOUGH_SAMPLES_TO_DETECT_ANOMALY_0_1 = "Not enough samples to detect anomaly: {0} / {1}";
public static final String CHECKING_DATABASE_HEALTH = "Checking database health...";
public static final String CHECKING_OBJECT_STORE_HEALTH = "Checking object store health...";
public static final String CHECKING_FOR_INCREASED_LOCKS = "Checking for increased locks...";

// Audit log

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.cloudfoundry.multiapps.common.SLException;
import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier;
import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor;
import org.cloudfoundry.multiapps.controller.core.Messages;
import org.cloudfoundry.multiapps.controller.core.application.health.database.DatabaseWaitingLocksAnalyzer;
Expand All @@ -29,6 +35,7 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;

import jakarta.inject.Inject;
import jakarta.inject.Named;

@Named
Expand All @@ -37,24 +44,37 @@ public class ApplicationHealthCalculator {
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationHealthCalculator.class);

private static final int UPDATE_HEALTH_CHECK_STATUS_PERIOD_IN_SECONDS = 10;
private static final int TIMEOUT_FOR_TASK_EXECUTION_IN_SECONDS = 50;
private static final int SINGLE_TASK_TIMEOUT_IN_SECONDS = 70; // timeout is set to 70 so it is higher than the DB connection acquisition
// timeout
private static final int TOTAL_TASK_TIMEOUT_IN_SECONDS = 3 * SINGLE_TASK_TIMEOUT_IN_SECONDS;

private final ObjectStoreFileStorage objectStoreFileStorage;
private final ApplicationConfiguration applicationConfiguration;
private final DatabaseHealthService databaseHealthService;
private final DatabaseMonitoringService databaseMonitoringService;
private final DatabaseWaitingLocksAnalyzer databaseWaitingLocksAnalyzer;

private final CachedObject<Boolean> objectStoreFileStorageHealthCache = new CachedObject<>(Duration.ofMinutes(1));
private final CachedObject<Boolean> dbHealthServiceCache = new CachedObject<>(Duration.ofMinutes(1));
private final CachedObject<Boolean> hasIncreasedLocksCache = new CachedObject<>(false, Duration.ofMinutes(1));

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final ExecutorService executor = Executors.newFixedThreadPool(3);
private final CachedObject<Boolean> objectStoreFileStorageHealthCache = new CachedObject<>(Duration.ofSeconds(TOTAL_TASK_TIMEOUT_IN_SECONDS));
private final CachedObject<Boolean> dbHealthServiceCache = new CachedObject<>(Duration.ofSeconds(TOTAL_TASK_TIMEOUT_IN_SECONDS));
private final CachedObject<Boolean> hasIncreasedLocksCache = new CachedObject<>(false,
Duration.ofSeconds(TOTAL_TASK_TIMEOUT_IN_SECONDS));
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ExecutorService taskExecutor = new ThreadPoolExecutor(3,
3,
0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
private final ExecutorService timeoutExecutor = new ThreadPoolExecutor(3,
3,
0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());

private final ResilientOperationExecutor resilientOperationExecutor = getResilienceExecutor();

@Autowired
@Inject
public ApplicationHealthCalculator(@Autowired(required = false) ObjectStoreFileStorage objectStoreFileStorage,
ApplicationConfiguration applicationConfiguration, DatabaseHealthService databaseHealthService,
DatabaseMonitoringService databaseMonitoringService,
Expand All @@ -73,9 +93,9 @@ protected void scheduleRegularHealthUpdate() {

protected void updateHealthStatus() {
List<Callable<Boolean>> tasks = List.of(this::isObjectStoreFileStorageHealthy, this::isDatabaseHealthy,
databaseWaitingLocksAnalyzer::hasIncreasedDbLocks);
this::checkForIncreasedLocksWithTimeout);
try {
List<Future<Boolean>> completedFutures = executor.invokeAll(tasks, TIMEOUT_FOR_TASK_EXECUTION_IN_SECONDS, TimeUnit.SECONDS);
List<Future<Boolean>> completedFutures = taskExecutor.invokeAll(tasks, TOTAL_TASK_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
executeFuture(completedFutures.get(0), isHealthy -> objectStoreFileStorageHealthCache.refresh(() -> isHealthy), false,
Messages.ERROR_OCCURRED_DURING_OBJECT_STORE_HEALTH_CHECKING);
executeFuture(completedFutures.get(1), isHealthy -> dbHealthServiceCache.refresh(() -> isHealthy), false,
Expand Down Expand Up @@ -155,7 +175,7 @@ private boolean isObjectStoreFileStorageHealthy() {
return true;
}
try {
resilientOperationExecutor.execute(objectStoreFileStorage::testConnection);
resilientOperationExecutor.execute((CheckedSupplier<Boolean>) this::testObjectStoreConnectionWithTimeout);
} catch (Exception e) {
LOGGER.error(MessageFormat.format(Messages.ERROR_OCCURRED_DURING_OBJECT_STORE_HEALTH_CHECKING_FOR_INSTANCE,
applicationConfiguration.getApplicationInstanceIndex()),
Expand All @@ -165,9 +185,22 @@ private boolean isObjectStoreFileStorageHealthy() {
return true;
}

private boolean testObjectStoreConnectionWithTimeout() throws ExecutionException, InterruptedException {
Future<Boolean> future = timeoutExecutor.submit(() -> {
objectStoreFileStorage.testConnection();
return true;
});
try {
LOGGER.debug(Messages.CHECKING_OBJECT_STORE_HEALTH);
return future.get(SINGLE_TASK_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new SLException(e, Messages.TIMEOUT_WHILE_CHECKING_OBJECT_STORE_HEALTH);
}
}

private boolean isDatabaseHealthy() {
try {
resilientOperationExecutor.execute(databaseHealthService::testDatabaseConnection);
resilientOperationExecutor.execute((CheckedSupplier<Boolean>) this::testDatabaseConnectionWithTimeout);
return true;
} catch (Exception e) {
LOGGER.error(MessageFormat.format(Messages.ERROR_OCCURRED_WHILE_CHECKING_DATABASE_INSTANCE_0,
Expand All @@ -177,6 +210,29 @@ private boolean isDatabaseHealthy() {
}
}

private boolean testDatabaseConnectionWithTimeout() throws ExecutionException, InterruptedException {
Future<Boolean> future = timeoutExecutor.submit(() -> {
databaseHealthService.testDatabaseConnection();
return true;
});
try {
LOGGER.debug(Messages.CHECKING_DATABASE_HEALTH);
return future.get(SINGLE_TASK_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new SLException(e, Messages.TIMEOUT_WHILE_CHECKING_DATABASE_HEALTH);
}
}

private boolean checkForIncreasedLocksWithTimeout() throws ExecutionException, InterruptedException {
Future<Boolean> future = timeoutExecutor.submit(databaseWaitingLocksAnalyzer::hasIncreasedDbLocks);
try {
LOGGER.debug(Messages.CHECKING_FOR_INCREASED_LOCKS);
return future.get(SINGLE_TASK_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new SLException(e, Messages.TIMEOUT_WHILE_CHECKING_FOR_INCREASED_LOCKS);
}
}

protected ResilientOperationExecutor getResilienceExecutor() {
return new ResilientOperationExecutor();
}
Expand Down