Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 79c6087

Browse files
committedMay 7, 2025·
HBASE-29272 When Spark reads an HBase snapshot, it always read empty data.
1 parent 30ce21b commit 79c6087

File tree

4 files changed

+189
-7
lines changed

4 files changed

+189
-7
lines changed
 

‎hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
4848
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
4949
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
50+
import org.apache.hadoop.hbase.snapshot.SnapshotRegionSizeCalculator;
5051
import org.apache.hadoop.hbase.util.Bytes;
5152
import org.apache.hadoop.hbase.util.CommonFSUtils;
5253
import org.apache.hadoop.hbase.util.RegionSplitter;
@@ -145,13 +146,19 @@ public static class InputSplit implements Writable {
145146
private String[] locations;
146147
private String scan;
147148
private String restoreDir;
149+
private long length;
148150

149151
// constructor for mapreduce framework / Writable
150152
public InputSplit() {
151153
}
152154

153155
public InputSplit(TableDescriptor htd, RegionInfo regionInfo, List<String> locations, Scan scan,
154-
Path restoreDir) {
156+
Path restoreDir){
157+
this(htd, regionInfo, locations, scan, restoreDir, 1);
158+
}
159+
160+
public InputSplit(TableDescriptor htd, RegionInfo regionInfo, List<String> locations, Scan scan,
161+
Path restoreDir, long length) {
155162
this.htd = htd;
156163
this.regionInfo = regionInfo;
157164
if (locations == null || locations.isEmpty()) {
@@ -166,6 +173,7 @@ public InputSplit(TableDescriptor htd, RegionInfo regionInfo, List<String> locat
166173
}
167174

168175
this.restoreDir = restoreDir.toString();
176+
this.length = length;
169177
}
170178

171179
public TableDescriptor getHtd() {
@@ -181,8 +189,7 @@ public String getRestoreDir() {
181189
}
182190

183191
public long getLength() {
184-
// TODO: We can obtain the file sizes of the snapshot here.
185-
return 0;
192+
return length;
186193
}
187194

188195
public String[] getLocations() {
@@ -440,6 +447,8 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
440447
}
441448
}
442449

450+
SnapshotRegionSizeCalculator snapshotRegionSizeCalculator = new SnapshotRegionSizeCalculator(
451+
conf, manifest);
443452
List<InputSplit> splits = new ArrayList<>();
444453
for (RegionInfo hri : regionManifests) {
445454
// load region descriptor
@@ -456,6 +465,7 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
456465
}
457466
}
458467

468+
long snapshotRegionSize = snapshotRegionSizeCalculator.getRegionSize(hri.getEncodedName());
459469
if (numSplits > 1) {
460470
byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
461471
for (int i = 0; i < sp.length - 1; i++) {
@@ -478,7 +488,7 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
478488
Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
479489
}
480490

481-
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
491+
splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir,snapshotRegionSize / numSplits));
482492
}
483493
}
484494
} else {
@@ -487,7 +497,7 @@ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
487497
hri.getEndKey())
488498
) {
489499

490-
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
500+
splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir, snapshotRegionSize));
491501
}
492502
}
493503
}

‎hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@
4040
import org.apache.hadoop.hbase.client.SnapshotDescription;
4141
import org.apache.hadoop.hbase.io.HFileLink;
4242
import org.apache.hadoop.hbase.io.WALLink;
43+
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
4344
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
4445
import org.apache.hadoop.hbase.util.CommonFSUtils;
46+
import org.apache.hadoop.hbase.util.FSUtils;
4547
import org.apache.hadoop.hbase.util.Strings;
4648
import org.apache.yetus.audience.InterfaceAudience;
4749
import org.slf4j.Logger;
@@ -51,7 +53,6 @@
5153
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
5254
import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
5355
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
54-
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
5556
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
5657

5758
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -151,6 +152,7 @@ String getStateToString() {
151152
private AtomicInteger logsCount = new AtomicInteger();
152153
private AtomicLong hfilesArchiveSize = new AtomicLong();
153154
private AtomicLong hfilesSize = new AtomicLong();
155+
private Map<String, Long> regionSizeMap = new ConcurrentHashMap<>();
154156
private AtomicLong hfilesMobSize = new AtomicLong();
155157
private AtomicLong nonSharedHfilesArchiveSize = new AtomicLong();
156158
private AtomicLong logSize = new AtomicLong();
@@ -176,6 +178,23 @@ String getStateToString() {
176178
this.fs = fs;
177179
}
178180

181+
SnapshotStats(final Configuration conf, final FileSystem fs,
182+
final SnapshotManifest mainfest) throws CorruptedSnapshotException {
183+
this.snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, mainfest.getSnapshotDir());;
184+
this.snapshotTable = mainfest.getTableDescriptor().getTableName();
185+
this.conf = conf;
186+
this.fs = fs;
187+
}
188+
189+
/**
190+
* Returns the map containing region sizes.
191+
*
192+
* @return A map where keys are region names and values are their corresponding sizes.
193+
*/
194+
public Map<String, Long> getRegionSizeMap() {
195+
return regionSizeMap;
196+
}
197+
179198
/** Returns the snapshot descriptor */
180199
public SnapshotDescription getSnapshotDescription() {
181200
return ProtobufUtil.createSnapshotDesc(this.snapshot);
@@ -347,6 +366,13 @@ FileInfo addStoreFile(final RegionInfo region, final String family,
347366
return new FileInfo(inArchive, size, isCorrupted);
348367
}
349368

369+
void updateRegionSizeMap(final RegionInfo region,
370+
final SnapshotRegionManifest.StoreFile storeFile){
371+
long currentSize = regionSizeMap.getOrDefault(region.getEncodedName(), 0L);
372+
regionSizeMap.put(region.getEncodedName(), currentSize + storeFile.getFileSize());
373+
}
374+
375+
350376
/**
351377
* Add the specified log file to the stats
352378
* @param server server name
@@ -604,7 +630,15 @@ public static SnapshotStats getSnapshotStats(final Configuration conf,
604630
FileSystem fs = FileSystem.get(rootDir.toUri(), conf);
605631
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotDesc, rootDir);
606632
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
607-
final SnapshotStats stats = new SnapshotStats(conf, fs, snapshotDesc);
633+
return getSnapshotStats(conf, manifest, filesMap);
634+
}
635+
636+
public static SnapshotStats getSnapshotStats(final Configuration conf,
637+
final SnapshotManifest manifest, final Map<Path, Integer> filesMap)
638+
throws IOException {
639+
Path rootDir = CommonFSUtils.getRootDir(conf);
640+
FileSystem fs = FileSystem.get(rootDir.toUri(), conf);
641+
final SnapshotStats stats = new SnapshotStats(conf, fs, manifest);
608642
SnapshotReferenceUtil.concurrentVisitReferencedFiles(conf, fs, manifest,
609643
"SnapshotsStatsAggregation", new SnapshotReferenceUtil.SnapshotVisitor() {
610644
@Override
@@ -613,6 +647,7 @@ public void storeFile(final RegionInfo regionInfo, final String family,
613647
if (!storeFile.hasReference()) {
614648
stats.addStoreFile(regionInfo, family, storeFile, filesMap);
615649
}
650+
stats.updateRegionSizeMap(regionInfo, storeFile);
616651
}
617652
});
618653
return stats;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package org.apache.hadoop.hbase.snapshot;
2+
3+
import java.io.IOException;
4+
import java.util.Arrays;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
import org.apache.hadoop.conf.Configuration;
10+
import org.apache.hadoop.fs.FileSystem;
11+
import org.apache.hadoop.fs.Path;
12+
import org.apache.hadoop.hbase.HConstants;
13+
import org.apache.hadoop.hbase.client.RegionInfo;
14+
import org.apache.hadoop.hbase.io.HFileLink;
15+
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
16+
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
17+
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
18+
import org.apache.hadoop.hbase.util.Bytes;
19+
import org.apache.hadoop.hbase.util.CommonFSUtils;
20+
import org.apache.hadoop.hbase.snapshot.SnapshotInfo.SnapshotStats;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* Utility class to calculate the size of each region in a snapshot.
26+
*/
27+
public class SnapshotRegionSizeCalculator {
28+
private static final Logger LOG = LoggerFactory.getLogger(SnapshotRegionSizeCalculator.class);
29+
private final SnapshotManifest manifest;
30+
private final Configuration conf;
31+
private final Map<String, Long> regionSizes;
32+
33+
34+
public SnapshotRegionSizeCalculator(Configuration conf, SnapshotManifest manifest)
35+
throws IOException {
36+
this.conf = conf;
37+
this.manifest = manifest;
38+
this.regionSizes = calculateRegionSizes();
39+
}
40+
41+
/**
42+
* Calculate the size of each region in the snapshot.
43+
* @return A map of region encoded names to their total size in bytes.
44+
*/
45+
public Map<String, Long> calculateRegionSizes() throws IOException {
46+
SnapshotStats stats = SnapshotInfo.getSnapshotStats(conf, manifest, null);
47+
return stats.getRegionSizeMap();
48+
}
49+
50+
51+
52+
public long getRegionSize(String encodedRegionName) {
53+
Long size = regionSizes.get(encodedRegionName);
54+
if (size == null) {
55+
LOG.debug("Unknown region:" + encodedRegionName);
56+
return 0;
57+
} else {
58+
return size;
59+
}
60+
}
61+
62+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.apache.hadoop.hbase.snapshot;
2+
3+
import static org.junit.Assert.assertTrue;
4+
5+
import java.io.IOException;
6+
import java.util.Map;
7+
8+
import org.apache.hadoop.conf.Configuration;
9+
import org.apache.hadoop.fs.FileSystem;
10+
import org.apache.hadoop.fs.Path;
11+
import org.apache.hadoop.hbase.HBaseClassTestRule;
12+
import org.apache.hadoop.hbase.HBaseTestingUtil;
13+
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
14+
import org.apache.hadoop.hbase.testclassification.SmallTests;
15+
import org.apache.hadoop.hbase.util.CommonFSUtils;
16+
import org.junit.AfterClass;
17+
import org.junit.BeforeClass;
18+
import org.junit.ClassRule;
19+
import org.junit.Test;
20+
import org.junit.experimental.categories.Category;
21+
22+
@Category(SmallTests.class)
23+
public class TestSnapshotRegionSizeCalculator {
24+
25+
@ClassRule
26+
public static final HBaseClassTestRule CLASS_RULE =
27+
HBaseClassTestRule.forClass(TestSnapshotRegionSizeCalculator.class);
28+
29+
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
30+
private static FileSystem fs;
31+
private static Path rootDir;
32+
private static Path snapshotDir;
33+
private static SnapshotProtos.SnapshotDescription snapshotDesc;
34+
private static SnapshotManifest manifest;
35+
36+
@BeforeClass
37+
public static void setUp() throws Exception {
38+
Configuration conf = TEST_UTIL.getConfiguration();
39+
fs = TEST_UTIL.getTestFileSystem();
40+
rootDir = TEST_UTIL.getDataTestDir("TestSnapshotRegionSizeCalculator");
41+
CommonFSUtils.setRootDir(conf, rootDir);
42+
43+
// Create a mock snapshot with a region and store files
44+
SnapshotTestingUtils.SnapshotMock snapshotMock =
45+
new SnapshotTestingUtils.SnapshotMock(conf, fs, rootDir);
46+
SnapshotTestingUtils.SnapshotMock.SnapshotBuilder builder =
47+
snapshotMock.createSnapshotV2("snapshot", "testTable", 4);
48+
builder.addRegion();
49+
builder.addRegion();
50+
builder.addRegion();
51+
builder.addRegion();
52+
snapshotDir = builder.commit();
53+
snapshotDesc = builder.getSnapshotDescription();
54+
manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
55+
56+
}
57+
58+
@AfterClass
59+
public static void tearDown() throws Exception {
60+
fs.delete(rootDir, true);
61+
}
62+
63+
@Test
64+
public void testCalculateRegionSizes() throws IOException {
65+
SnapshotRegionSizeCalculator calculator =
66+
new SnapshotRegionSizeCalculator(TEST_UTIL.getConfiguration(), manifest);
67+
Map<String, Long> regionSizes = calculator.calculateRegionSizes();
68+
69+
// Verify that the region sizes are calculated correctly
70+
assertTrue("No regions found in the snapshot", !regionSizes.isEmpty());
71+
for (Map.Entry<String, Long> entry : regionSizes.entrySet()) {
72+
assertTrue("Region size should be non-negative", entry.getValue() == 0);
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)
Please sign in to comment.