diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java index b593a104fe1..c4bab047318 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -72,15 +73,9 @@ public class SimpleLoadBalancer implements TabletBalancer { protected BalancerEnvironment environment; Iterator assignments; - // if tableToBalance is set, then only balance the given table - TableId tableToBalance = null; public SimpleLoadBalancer() {} - public SimpleLoadBalancer(TableId table) { - tableToBalance = table; - } - @Override public void init(BalancerEnvironment balancerEnvironment) { this.environment = balancerEnvironment; @@ -156,12 +151,13 @@ public int compareTo(ServerCounts obj) { } } - public boolean getMigrations(Map current, - List result) { + public boolean getMigrations(BalanceParameters params) { + Set tableIdsToBalance = new HashSet<>(params.getTablesToBalance().values()); + List resultingMigrations = new ArrayList<>(); boolean moreBalancingNeeded = false; try { // no moves possible - if (current.size() < 2) { + if (params.currentStatus().size() < 2) { return false; } final Map> donerTabletStats = new HashMap<>(); @@ -169,7 +165,7 @@ public boolean getMigrations(Map current, // Sort by total number of online tablets, per server int total = 0; ArrayList totals = new ArrayList<>(); - for (Entry entry : current.entrySet()) { + for (Entry entry : params.currentStatus().entrySet()) { int serverTotal = 0; if (entry.getValue() != null && entry.getValue().getTableMap() != null) { for (Entry e : entry.getValue().getTableMap().entrySet()) { @@ -177,7 +173,7 @@ public boolean getMigrations(Map current, * The check below was on entry.getKey(), but that resolves to a tabletserver not a * tablename. Believe it should be e.getKey() which is a tablename */ - if (tableToBalance == null || tableToBalance.canonical().equals(e.getKey())) { + if (tableIdsToBalance.contains(TableId.of(e.getKey()))) { serverTotal += e.getValue().getOnlineTabletCount(); } } @@ -213,11 +209,13 @@ public boolean getMigrations(Map current, break; } if (needToUnload >= needToLoad) { - result.addAll(move(tooMany, tooLittle, needToLoad, donerTabletStats)); + resultingMigrations + .addAll(move(tooMany, tooLittle, needToLoad, donerTabletStats, tableIdsToBalance)); end--; movedAlready = 0; } else { - result.addAll(move(tooMany, tooLittle, needToUnload, donerTabletStats)); + resultingMigrations + .addAll(move(tooMany, tooLittle, needToUnload, donerTabletStats, tableIdsToBalance)); movedAlready += needToUnload; } if (needToUnload > needToLoad) { @@ -229,7 +227,8 @@ public boolean getMigrations(Map current, } } finally { - log.trace("balance ended with {} migrations", result.size()); + log.trace("balance ended with {} migrations", resultingMigrations.size()); + params.migrationsOut().addAll(resultingMigrations); } return moreBalancingNeeded; } @@ -239,7 +238,8 @@ public boolean getMigrations(Map current, * busiest table */ List move(ServerCounts tooMuch, ServerCounts tooLittle, int count, - Map> donerTabletStats) { + Map> donerTabletStats, + Set tableIdsToBalance) { if (count == 0) { return Collections.emptyList(); @@ -247,8 +247,8 @@ List move(ServerCounts tooMuch, ServerCounts tooLittle, int cou List result = new ArrayList<>(); // Copy counts so we can update them as we propose migrations - Map tooMuchMap = tabletCountsPerTable(tooMuch.status); - Map tooLittleMap = tabletCountsPerTable(tooLittle.status); + Map tooMuchMap = tabletCountsPerTable(tooMuch.status, tableIdsToBalance); + Map tooLittleMap = tabletCountsPerTable(tooLittle.status, tableIdsToBalance); for (int i = 0; i < count; i++) { TableId table = getTableToMigrate(tooMuch, tooMuchMap, tooLittleMap); @@ -294,8 +294,8 @@ List move(ServerCounts tooMuch, ServerCounts tooLittle, int cou private TableId getTableToMigrate(ServerCounts tooMuch, Map tooMuchMap, Map tooLittleMap) { - if (tableToBalance != null) { - return tableToBalance; + if (tooMuchMap.size() == 1) { + return tooMuchMap.keySet().iterator().next(); } // find a table to migrate @@ -319,12 +319,16 @@ protected List getOnlineTabletsForTable(TabletServerId tabletS return environment.listOnlineTabletsForTable(tabletServerId, tableId); } - static Map tabletCountsPerTable(TServerStatus status) { + static Map tabletCountsPerTable(TServerStatus status, + Set tableIdsToBalance) { Map result = new HashMap<>(); if (status != null && status.getTableMap() != null) { Map tableMap = status.getTableMap(); for (Entry entry : tableMap.entrySet()) { - result.put(TableId.of(entry.getKey()), entry.getValue().getOnlineTabletCount()); + var tableId = TableId.of(entry.getKey()); + if (tableIdsToBalance.contains(tableId)) { + result.put(tableId, entry.getValue().getOnlineTabletCount()); + } } } return result; @@ -381,7 +385,7 @@ public long balance(BalanceParameters params) { // Don't migrate if we have migrations in progress if (params.currentMigrations().isEmpty()) { problemReporter.clearProblemReportTimes(); - if (getMigrations(params.currentStatus(), params.migrationsOut())) { + if (getMigrations(params)) { return SECONDS.toMillis(1); } } else { diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java index 84c9074b466..f982f9d88b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java @@ -56,8 +56,8 @@ private TabletBalancer constructNewBalancerForTable(String clazzName, TableId ta String context = environment.tableContext(tableId); Class clazz = ClassLoaderUtil.loadClass(context, clazzName, TabletBalancer.class); - Constructor constructor = clazz.getConstructor(TableId.class); - return constructor.newInstance(tableId); + Constructor constructor = clazz.getConstructor(); + return constructor.newInstance(); } protected String getLoadBalancerClassNameForTable(TableId table) { @@ -98,9 +98,8 @@ protected TabletBalancer getBalancerForTable(TableId tableId) { } if (balancer == null) { - log.info("Creating balancer {} limited to balancing table {}", - SimpleLoadBalancer.class.getName(), tableId); - balancer = new SimpleLoadBalancer(tableId); + log.info("Creating balancer {} for table {}", SimpleLoadBalancer.class.getName(), tableId); + balancer = new SimpleLoadBalancer(); } perTableBalancers.put(tableId, balancer); balancer.init(environment); diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java index 41193380aca..6c068eb0e31 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java @@ -144,7 +144,8 @@ public void testAssignMigrations() { // Nothing should happen, we are balanced ArrayList out = new ArrayList<>(); - balancer.getMigrations(current, out); + balancer.getMigrations(new BalanceParamsImpl(current, Set.of(), out, DataLevel.USER, + Map.of("t1", TableId.of("t1"), "t2", TableId.of("t2"), "t3", TableId.of("t3")))); assertEquals(out.size(), 0); // Take down a tabletServer @@ -204,7 +205,7 @@ public void testUnevenAssignment() { while (true) { List migrationsOut = new ArrayList<>(); balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, - DataLevel.USER, Map.of())); + DataLevel.USER, Map.of("newTable", TableId.of("newTable")))); if (migrationsOut.isEmpty()) { break; } @@ -226,42 +227,130 @@ public void testUnevenAssignment2() { FakeTServer fakeTServer = new FakeTServer(); servers.put(tsid, fakeTServer); } + String table1 = "LongTable"; + String table2 = "ShortTable"; + + // Generate Table Ids + TableId t1Id = TableId.of(table1); + TableId t2Id = TableId.of(table2); + // put 60 tablets on 25 of them List> shortList = new ArrayList<>(servers.entrySet()); Entry shortServer = shortList.remove(0); - int c = 0; for (int i = 0; i < 60; i++) { for (Entry entry : shortList) { - entry.getValue().tablets.add(makeTablet("t" + c, null, null)); + entry.getValue().tablets.add(makeTablet(table1, null, null)); } } // put 10 on the that short server: for (int i = 0; i < 10; i++) { - shortServer.getValue().tablets.add(makeTablet("s" + i, null, null)); + shortServer.getValue().tablets.add(makeTablet(table2, null, null)); } + Map migratedTables; + // average is 58, with 2 at 59: we need 48 more moved to the short server + migratedTables = balanceTables(Map.of(table1, t1Id, table2, t2Id)); + assertEquals(48, migratedTables.get(t1Id)); + + // ShortTable tablets are only on the short server: 9 should be migrated off + migratedTables = balanceTables(Map.of(table2, t2Id)); + assertEquals(1, migratedTables.size()); + assertEquals(9, migratedTables.get(t2Id)); + + // ShortServer now has 9 less LongTable tablets: 9 LongTable tablets should migrate back + migratedTables = balanceTables(Map.of(table1, t1Id)); + assertEquals(1, migratedTables.size()); + assertEquals(9, migratedTables.get(t1Id)); + + // Servers are now fully balanced. Attempt to balance both tables again + migratedTables = balanceTables(Map.of(table1, t1Id, table2, t2Id)); + assertEquals(0, migratedTables.size(), "Migrated Tables: " + migratedTables); + } + + @Test + public void testBalanceSubset() { + // make 26 servers + for (char c : "abcdefghijklmnopqrstuvwxyz".toCharArray()) { + TabletServerId tsid = new TabletServerIdImpl("127.0.0.1", c, Character.toString(c)); + FakeTServer fakeTServer = new FakeTServer(); + servers.put(tsid, fakeTServer); + } + String table1 = "Long"; + String table2 = "Short"; + String table3 = "neverMigrate"; + + // Generate Table Ids + TableId t1Id = TableId.of(table1); + TableId t2Id = TableId.of(table2); + TableId t3Id = TableId.of(table3); + + // put 10 tablets on 8 of the servers that should never migrate + int randServer = 0; + for (Entry entry : servers.entrySet()) { + randServer++; + if (randServer % 3 == 0) { + for (int i = 0; i < 10; i++) { + entry.getValue().tablets.add(makeTablet(table3, null, null)); + } + } + } + + // put 60 tablets on 25 of them + List> shortList = new ArrayList<>(servers.entrySet()); + Entry shortServer = shortList.remove(0); + for (int i = 0; i < 60; i++) { + for (Entry entry : shortList) { + entry.getValue().tablets.add(makeTablet(table1, null, null)); + } + } + // put 10 on the that short server: + for (int i = 0; i < 10; i++) { + shortServer.getValue().tablets.add(makeTablet(table2, null, null)); + } + + Map migratedTables; + // average is 58, with 2 at 59: we need 48 more moved to the short server + migratedTables = balanceTables(Map.of(table1, t1Id, table2, t2Id)); + assertEquals(1, migratedTables.size()); + assertEquals(48, migratedTables.get(t1Id)); + + // Servers are now fully balanced. Attempt to balance both tables again + migratedTables = balanceTables(Map.of(table1, t1Id, table2, t2Id)); + assertEquals(0, migratedTables.size()); + + // Only balance the neverMigrate table + migratedTables = balanceTables(Map.of(table3, t3Id)); + assertEquals(1, migratedTables.size(), "Too many tables migrated: " + migratedTables); + assertEquals(54, migratedTables.get(t3Id)); + } + + private Map balanceTables(Map tablesToBalance) { TestSimpleLoadBalancer balancer = new TestSimpleLoadBalancer(); + Map migratedTables = new HashMap<>(); Set migrations = Collections.emptySet(); - int moved = 0; // balance until we can't balance no more! while (true) { List migrationsOut = new ArrayList<>(); balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, - DataLevel.USER, Map.of())); + DataLevel.USER, tablesToBalance)); if (migrationsOut.isEmpty()) { break; } for (TabletMigration migration : migrationsOut) { if (servers.get(migration.getOldTabletServer()).tablets.remove(migration.getTablet())) { - moved++; + TableId tableId = migration.getTablet().getTable(); + Integer value = 0; + if (migratedTables.containsKey(tableId)) { + value = migratedTables.get(tableId); + } + migratedTables.put(tableId, value + 1); } last.remove(migration.getTablet()); servers.get(migration.getNewTabletServer()).tablets.add(migration.getTablet()); last.put(migration.getTablet(), migration.getNewTabletServer()); } } - // average is 58, with 2 at 59: we need 48 more moved to the short server - assertEquals(48, moved); + return migratedTables; } private void checkBalance(List metadataTable, Map servers, diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java index 6045f417a85..cd6f00c1ab3 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java @@ -101,10 +101,6 @@ static List generateFakeTablets(TabletServerId tserver, TableI public static class TestSimpleLoadBalancer extends SimpleLoadBalancer { - public TestSimpleLoadBalancer(TableId table) { - super(table); - } - @Override public void init(BalancerEnvironment balancerEnvironment) {}