Skip to content

HBASE-29255: Integrate backup WAL cleanup logic with the delete command #7007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: HBASE-28957
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.backup.impl;

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
Expand Down Expand Up @@ -47,18 +49,26 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;

import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand All @@ -71,6 +81,7 @@
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -80,6 +91,7 @@
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
Expand Down Expand Up @@ -649,6 +661,8 @@ public void execute() throws IOException {
} else if (cmdline.hasOption(OPTION_LIST)) {
executeDeleteListOfBackups(cmdline, isForceDelete);
}

cleanUpUnusedBackupWALs();
}

private void executeDeleteOlderThan(CommandLine cmdline, boolean isForceDelete)
Expand Down Expand Up @@ -876,6 +890,140 @@ private boolean canAnyOtherBackupCover(List<BackupInfo> allBackups, BackupInfo c
return false;
}

/**
* Cleans up Write-Ahead Logs (WALs) that are no longer required for PITR after a successful
* backup deletion.
*/
private void cleanUpUnusedBackupWALs() throws IOException {
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);

if (Strings.isNullOrEmpty(backupWalDir)) {
System.out.println("No WAL directory specified for continuous backup. Skipping cleanup.");
return;
}

try (Connection conn = ConnectionFactory.createConnection(conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: avoid using generic name like conn, use specific like masterConn

Copy link
Contributor

@abhradeepkundu abhradeepkundu Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This connection creation is unnecessary I feel. Super class already has a connection open. Please verify If you can reuse it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we'll reuse that!

BackupSystemTable sysTable = new BackupSystemTable(conn)) {
// Get list of tables under continuous backup
Map<TableName, Long> continuousBackupTables = sysTable.getContinuousBackupTableSet();
if (continuousBackupTables.isEmpty()) {
System.out.println("No continuous backups configured. Skipping WAL cleanup.");
return;
}

// Find the earliest timestamp after which WALs are still needed
long cutoffTimestamp = determineWALCleanupCutoffTime(sysTable);
if (cutoffTimestamp == 0) {
System.err.println("ERROR: No valid full backup found. Skipping WAL cleanup.");
return;
}

// Update metadata before actual cleanup to avoid inconsistencies
updateBackupTableStartTimes(sysTable, cutoffTimestamp);

// Delete WAL files older than cutoff timestamp
deleteOldWALFiles(conf, backupWalDir, cutoffTimestamp);

}
}

/**
* Determines the cutoff time for cleaning WAL files.
* @param sysTable Backup system table
* @return cutoff timestamp or 0 if not found
*/
private long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOException {
List<BackupInfo> backupInfos = sysTable.getBackupInfos(BackupState.COMPLETE);
Collections.reverse(backupInfos); // Start from oldest

for (BackupInfo backupInfo : backupInfos) {
if (BackupType.FULL.equals(backupInfo.getType())) {
return backupInfo.getStartTs();
}
}
return 0;
}

/**
* Updates the start time for continuous backups if older than cutoff timestamp.
* @param sysTable Backup system table
* @param cutoffTimestamp Timestamp before which WALs are no longer needed
*/
private void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestamp)
Comment on lines +947 to +952
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vinayakphegde, this is the function that led me to ask for clarification on why we need to update the start times of the continuous backups. Maybe you could add another line or two to the docstring here that elaborates on why we need to do this? That may make it more clear to others in the future.

throws IOException {

Map<TableName, Long> backupTables = sysTable.getContinuousBackupTableSet();
Set<TableName> tablesToUpdate = new HashSet<>();

for (Map.Entry<TableName, Long> entry : backupTables.entrySet()) {
if (entry.getValue() < cutoffTimestamp) {
tablesToUpdate.add(entry.getKey());
}
}

if (!tablesToUpdate.isEmpty()) {
sysTable.updateContinuousBackupTableSet(tablesToUpdate, cutoffTimestamp);
}
}

/**
* Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp.
*/
private void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
throws IOException {
System.out.println("Starting WAL cleanup in backup directory: " + backupWalDir
+ " with cutoff time: " + cutoffTime);

BackupFileSystemManager manager =
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();

SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

System.out.println("Listing directories under: " + walDir);

FileStatus[] directories = fs.listStatus(walDir);

for (FileStatus dirStatus : directories) {
if (!dirStatus.isDirectory()) {
continue; // Skip files, we only want directories
}

Path dirPath = dirStatus.getPath();
String dirName = dirPath.getName();

try {
long dayStart = parseDayDirectory(dirName, dateFormat);
System.out
.println("Checking WAL directory: " + dirName + " (Start Time: " + dayStart + ")");

// If WAL files of that day are older than cutoff time, delete them
if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
System.out.println("Deleting outdated WAL directory: " + dirPath);
fs.delete(dirPath, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an api to delete in batches, we should use it. Also based on the nos of the file you are deleting this method can take lot of time. May be we can asynchronous here. Please give a thought

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an api to delete in batches, we should use it.

Yeah, I checked but couldn’t find any API that supports batch deletion.

Also based on the nos of the file you are deleting this method can take lot of time. May be we can asynchronous here. Please give a thought

About going async — it’s a good idea, but it might add some complexity. We’d need to track if the delete actually finished, retry on failure, and maybe notify the user when it’s done.

So we should probably think about whether the added complexity is worth the gain. Also, right now, all our backup and restore commands (like full backup, incremental, restore) are synchronous anyway, and those can take hours.

I think async is definitely a good direction — just that it probably makes sense to build a proper framework around it first, so we can handle retries, tracking, and notifications across the board. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets build a job co-ordinator framework with zookeeper. We should build that outside the scope of this ticket off course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me create a jira for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point guys, but before going down this rabbit hole, please do some performance tests for justification. Try to delete 100, 10000 and 1 million files in a single directory and share how much time does it take synchronously. Delete/unlink operations should be relatively quick in any filesystem, but let's see how it works with S3.

fs.delete(new Path(bulkloadDir, dirName), true);
}
} catch (ParseException e) {
System.out.println("WARNING: Failed to parse directory name '" + dirName
+ "'. Skipping. Error: " + e.getMessage());
} catch (IOException e) {
System.out.println("WARNING: Failed to delete directory '" + dirPath
+ "'. Skipping. Error: " + e.getMessage());
}
}

System.out.println("Completed WAL cleanup for backup directory: " + backupWalDir);
}

private long parseDayDirectory(String dayDir, SimpleDateFormat dateFormat)
throws ParseException {
return dateFormat.parse(dayDir).getTime();
}

@Override
protected void printUsage() {
System.out.println(DELETE_CMD_USAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,27 @@ public void addContinuousBackupTableSet(Set<TableName> tables, long startTimesta
}
}

/**
* Updates the system table with the new start timestamps for continuous backup tables.
* @param tablesToUpdate The set of tables that need their start timestamps updated.
* @param newStartTimestamp The new start timestamp to be set.
*/
public void updateContinuousBackupTableSet(Set<TableName> tablesToUpdate, long newStartTimestamp)
throws IOException {
try (Table table = connection.getTable(tableName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Add a null check for tablesToUpdate

Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));

for (TableName tableName : tablesToUpdate) {
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(tableName.getNameAsString()),
Bytes.toBytes(newStartTimestamp));
}

table.put(put);
LOG.info("Successfully updated start timestamps for {} tables in the backup system table.",
tablesToUpdate.size());
}
}

/**
* Removes tables from the global continuous backup set. Only removes entries that currently exist
* in the backup system table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -304,7 +305,7 @@ private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOExc
walWriter.append(entry);
}
walWriter.sync(true);
uploadBulkLoadFiles(bulkLoadFiles);
uploadBulkLoadFiles(day, bulkLoadFiles);
} catch (UncheckedIOException e) {
String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day;
LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day,
Expand All @@ -314,9 +315,7 @@ private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOExc
}

private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
// Convert dayInMillis to "yyyy-MM-dd" format
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
String dayDirectoryName = dateFormat.format(new Date(dayInMillis));
String dayDirectoryName = formatToDateString(dayInMillis);

FileSystem fs = backupFileSystemManager.getBackupFs();
Path walsDir = backupFileSystemManager.getWalsDir();
Expand Down Expand Up @@ -376,17 +375,21 @@ private void close() {
}
}

private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException {
private void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles) throws IOException {
LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId),
bulkLoadFiles.size());

if (LOG.isTraceEnabled()) {
LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
}
String dayDirectoryName = formatToDateString(dayInMillis);
Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName);
backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);

for (Path file : bulkLoadFiles) {
Path sourcePath = getBulkLoadFileStagingPath(file);
Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), file);
Path destPath = new Path(bulkloadDir, file);

try {
LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath,
Expand All @@ -407,6 +410,15 @@ private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException {
LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId));
}

/**
* Convert dayInMillis to "yyyy-MM-dd" format
*/
private String formatToDateString(long dayInMillis) {
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat.format(new Date(dayInMillis));
}

private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException {
FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
Path rootDir = CommonFSUtils.getRootDir(conf);
Expand Down
Loading