|
18 | 18 | package org.apache.hadoop.hbase.backup.impl;
|
19 | 19 |
|
20 | 20 | import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
|
| 21 | +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; |
| 22 | +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; |
21 | 23 | import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
|
22 | 24 | import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
|
23 | 25 | import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
|
|
47 | 49 | import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC;
|
48 | 50 | import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
|
49 | 51 | import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC;
|
| 52 | +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; |
| 53 | +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; |
50 | 54 |
|
51 | 55 | import java.io.IOException;
|
52 | 56 | import java.net.URI;
|
| 57 | +import java.text.ParseException; |
| 58 | +import java.text.SimpleDateFormat; |
53 | 59 | import java.util.ArrayList;
|
| 60 | +import java.util.Collections; |
| 61 | +import java.util.HashSet; |
54 | 62 | import java.util.List;
|
55 | 63 | import java.util.Map;
|
56 | 64 | import java.util.Optional;
|
57 | 65 | import java.util.Set;
|
| 66 | +import java.util.TimeZone; |
58 | 67 | import java.util.concurrent.TimeUnit;
|
59 | 68 | import org.apache.commons.lang3.StringUtils;
|
60 | 69 | import org.apache.hadoop.conf.Configuration;
|
61 | 70 | import org.apache.hadoop.conf.Configured;
|
| 71 | +import org.apache.hadoop.fs.FileStatus; |
62 | 72 | import org.apache.hadoop.fs.FileSystem;
|
63 | 73 | import org.apache.hadoop.fs.Path;
|
64 | 74 | import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
71 | 81 | import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
|
72 | 82 | import org.apache.hadoop.hbase.backup.BackupType;
|
73 | 83 | import org.apache.hadoop.hbase.backup.HBackupFileSystem;
|
| 84 | +import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager; |
74 | 85 | import org.apache.hadoop.hbase.backup.util.BackupSet;
|
75 | 86 | import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
76 | 87 | import org.apache.hadoop.hbase.client.Connection;
|
@@ -649,6 +660,8 @@ public void execute() throws IOException {
|
649 | 660 | } else if (cmdline.hasOption(OPTION_LIST)) {
|
650 | 661 | executeDeleteListOfBackups(cmdline, isForceDelete);
|
651 | 662 | }
|
| 663 | + |
| 664 | + cleanUpUnusedBackupWALs(); |
652 | 665 | }
|
653 | 666 |
|
654 | 667 | private void executeDeleteOlderThan(CommandLine cmdline, boolean isForceDelete)
|
@@ -876,6 +889,140 @@ private boolean canAnyOtherBackupCover(List<BackupInfo> allBackups, BackupInfo c
|
876 | 889 | return false;
|
877 | 890 | }
|
878 | 891 |
|
| 892 | + /** |
| 893 | + * Cleans up Write-Ahead Logs (WALs) that are no longer required for PITR after a successful |
| 894 | + * backup deletion. |
| 895 | + */ |
| 896 | + private void cleanUpUnusedBackupWALs() throws IOException { |
| 897 | + Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); |
| 898 | + String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); |
| 899 | + |
| 900 | + if (backupWalDir == null || backupWalDir.isEmpty()) { |
| 901 | + System.out.println("No WAL directory specified for continuous backup. Skipping cleanup."); |
| 902 | + return; |
| 903 | + } |
| 904 | + |
| 905 | + try (Connection conn = ConnectionFactory.createConnection(conf); |
| 906 | + BackupSystemTable sysTable = new BackupSystemTable(conn)) { |
| 907 | + // Get list of tables under continuous backup |
| 908 | + Map<TableName, Long> continuousBackupTables = sysTable.getContinuousBackupTableSet(); |
| 909 | + if (continuousBackupTables.isEmpty()) { |
| 910 | + System.out.println("No continuous backups configured. Skipping WAL cleanup."); |
| 911 | + return; |
| 912 | + } |
| 913 | + |
| 914 | + // Find the earliest timestamp after which WALs are still needed |
| 915 | + long cutoffTimestamp = determineWALCleanupCutoffTime(sysTable); |
| 916 | + if (cutoffTimestamp == 0) { |
| 917 | + System.err.println("ERROR: No valid full backup found. Skipping WAL cleanup."); |
| 918 | + return; |
| 919 | + } |
| 920 | + |
| 921 | + // Update metadata before actual cleanup to avoid inconsistencies |
| 922 | + updateBackupTableStartTimes(sysTable, cutoffTimestamp); |
| 923 | + |
| 924 | + // Delete WAL files older than cutoff timestamp |
| 925 | + deleteOldWALFiles(conf, backupWalDir, cutoffTimestamp); |
| 926 | + |
| 927 | + } |
| 928 | + } |
| 929 | + |
| 930 | + /** |
| 931 | + * Determines the cutoff time for cleaning WAL files. |
| 932 | + * @param sysTable Backup system table |
| 933 | + * @return cutoff timestamp or 0 if not found |
| 934 | + */ |
| 935 | + private long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOException { |
| 936 | + List<BackupInfo> backupInfos = sysTable.getBackupInfos(BackupState.COMPLETE); |
| 937 | + Collections.reverse(backupInfos); // Start from oldest |
| 938 | + |
| 939 | + for (BackupInfo backupInfo : backupInfos) { |
| 940 | + if (BackupType.FULL.equals(backupInfo.getType())) { |
| 941 | + return backupInfo.getStartTs(); |
| 942 | + } |
| 943 | + } |
| 944 | + return 0; |
| 945 | + } |
| 946 | + |
| 947 | + /** |
| 948 | + * Updates the start time for continuous backups if older than cutoff timestamp. |
| 949 | + * @param sysTable Backup system table |
| 950 | + * @param cutoffTimestamp Timestamp before which WALs are no longer needed |
| 951 | + */ |
| 952 | + private void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestamp) |
| 953 | + throws IOException { |
| 954 | + |
| 955 | + Map<TableName, Long> backupTables = sysTable.getContinuousBackupTableSet(); |
| 956 | + Set<TableName> tablesToUpdate = new HashSet<>(); |
| 957 | + |
| 958 | + for (Map.Entry<TableName, Long> entry : backupTables.entrySet()) { |
| 959 | + if (entry.getValue() < cutoffTimestamp) { |
| 960 | + tablesToUpdate.add(entry.getKey()); |
| 961 | + } |
| 962 | + } |
| 963 | + |
| 964 | + if (!tablesToUpdate.isEmpty()) { |
| 965 | + sysTable.updateContinuousBackupTableSet(tablesToUpdate, cutoffTimestamp); |
| 966 | + } |
| 967 | + } |
| 968 | + |
| 969 | + /** |
| 970 | + * Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp. |
| 971 | + */ |
| 972 | + private void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime) |
| 973 | + throws IOException { |
| 974 | + System.out.println("Starting WAL cleanup in backup directory: " + backupWalDir |
| 975 | + + " with cutoff time: " + cutoffTime); |
| 976 | + |
| 977 | + BackupFileSystemManager manager = |
| 978 | + new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir); |
| 979 | + FileSystem fs = manager.getBackupFs(); |
| 980 | + Path walDir = manager.getWalsDir(); |
| 981 | + Path bulkloadDir = manager.getBulkLoadFilesDir(); |
| 982 | + |
| 983 | + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); |
| 984 | + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); |
| 985 | + |
| 986 | + System.out.println("Listing directories under: " + walDir); |
| 987 | + |
| 988 | + FileStatus[] directories = fs.listStatus(walDir); |
| 989 | + |
| 990 | + for (FileStatus dirStatus : directories) { |
| 991 | + if (!dirStatus.isDirectory()) { |
| 992 | + continue; // Skip files, we only want directories |
| 993 | + } |
| 994 | + |
| 995 | + Path dirPath = dirStatus.getPath(); |
| 996 | + String dirName = dirPath.getName(); |
| 997 | + |
| 998 | + try { |
| 999 | + long dayStart = parseDayDirectory(dirName, dateFormat); |
| 1000 | + System.out |
| 1001 | + .println("Checking WAL directory: " + dirName + " (Start Time: " + dayStart + ")"); |
| 1002 | + |
| 1003 | + // If WAL files of that day are older than cutoff time, delete them |
| 1004 | + if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) { |
| 1005 | + System.out.println("Deleting outdated WAL directory: " + dirPath); |
| 1006 | + fs.delete(dirPath, true); |
| 1007 | + fs.delete(new Path(bulkloadDir, dirName), true); |
| 1008 | + } |
| 1009 | + } catch (ParseException e) { |
| 1010 | + System.out.println("WARNING: Failed to parse directory name '" + dirName |
| 1011 | + + "'. Skipping. Error: " + e.getMessage()); |
| 1012 | + } catch (IOException e) { |
| 1013 | + System.out.println("WARNING: Failed to delete directory '" + dirPath |
| 1014 | + + "'. Skipping. Error: " + e.getMessage()); |
| 1015 | + } |
| 1016 | + } |
| 1017 | + |
| 1018 | + System.out.println("Completed WAL cleanup for backup directory: " + backupWalDir); |
| 1019 | + } |
| 1020 | + |
| 1021 | + private long parseDayDirectory(String dayDir, SimpleDateFormat dateFormat) |
| 1022 | + throws ParseException { |
| 1023 | + return dateFormat.parse(dayDir).getTime(); |
| 1024 | + } |
| 1025 | + |
879 | 1026 | @Override
|
880 | 1027 | protected void printUsage() {
|
881 | 1028 | System.out.println(DELETE_CMD_USAGE);
|
|
0 commit comments