diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java new file mode 100644 index 00000000000..b7ac49ec7d7 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.MINUTES; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; +import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; +import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; +import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.TableInfo; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.SystemTables; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.filters.HasMigrationFilter; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.spi.balancer.BalancerEnvironment; +import org.apache.accumulo.core.spi.balancer.DoNothingBalancer; +import org.apache.accumulo.core.spi.balancer.TabletBalancer; +import org.apache.accumulo.core.spi.balancer.data.TServerStatus; +import org.apache.accumulo.core.spi.balancer.data.TabletMigration; +import org.apache.accumulo.core.spi.balancer.data.TabletServerId; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.manager.metrics.BalancerMetrics; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; +import org.apache.accumulo.server.manager.state.UnassignedTablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +class BalanceManager { + + private static final Logger log = LoggerFactory.getLogger(BalanceManager.class); + + private final AtomicReference manager; + // all access to this should be through getBalancer() + private TabletBalancer tabletBalancer; + private volatile BalancerEnvironment balancerEnvironment; + private final BalancerMetrics balancerMetrics = new BalancerMetrics(); + private final Object balancedNotifier = new Object(); + private static final long CLEANUP_INTERVAL_MINUTES = Manager.CLEANUP_INTERVAL_MINUTES; + + BalanceManager() { + this.manager = new AtomicReference<>(null); + } + + void setManager(Manager manager) { + Objects.requireNonNull(manager); + if (this.manager.compareAndSet(null, manager)) { + this.balancerEnvironment = new BalancerEnvironmentImpl(manager.getContext()); + } else if (this.manager.get() != manager) { + throw new IllegalStateException("Attempted to set different manager object"); + } + } + + private Manager getManager() { + // fail fast if not yet set + return Objects.requireNonNull(manager.get(), "Manager has not been set."); + } + + synchronized TabletBalancer getBalancer() { + String configuredBalancerClass = + getManager().getConfiguration().get(Property.MANAGER_TABLET_BALANCER); + try { + if (tabletBalancer == null + || !tabletBalancer.getClass().getName().equals(configuredBalancerClass)) { + log.debug("Attempting to initialize balancer using class {}, was {}", + configuredBalancerClass, + tabletBalancer == null ? "null" : tabletBalancer.getClass().getName()); + var localTabletBalancer = + Property.createInstanceFromPropertyName(getManager().getConfiguration(), + Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new DoNothingBalancer()); + localTabletBalancer.init(balancerEnvironment); + tabletBalancer = localTabletBalancer; + log.info("tablet balancer changed to {}", localTabletBalancer.getClass().getName()); + } + } catch (Exception e) { + log.warn("Failed to create balancer {} using {} instead", configuredBalancerClass, + DoNothingBalancer.class, e); + var localTabletBalancer = new DoNothingBalancer(); + localTabletBalancer.init(balancerEnvironment); + tabletBalancer = localTabletBalancer; + } + + return tabletBalancer; + } + + private ServerContext getContext() { + return getManager().getContext(); + } + + MetricsProducer getMetrics() { + return balancerMetrics; + } + + /** + * balanceTablets() balances tables by DataLevel. Return the current set of migrations partitioned + * by DataLevel + */ + private Map> partitionMigrations() { + final Map> partitionedMigrations = + new EnumMap<>(Ample.DataLevel.class); + for (Ample.DataLevel dl : Ample.DataLevel.values()) { + Set extents = new HashSet<>(); + // prev row needed for the extent + try (var tabletsMetadata = getContext() + .getAmple().readTablets().forLevel(dl).fetch(TabletMetadata.ColumnType.PREV_ROW, + TabletMetadata.ColumnType.MIGRATION, TabletMetadata.ColumnType.LOCATION) + .filter(new HasMigrationFilter()).build()) { + // filter out migrations that are awaiting cleanup + tabletsMetadata.stream().filter(tm -> !shouldCleanupMigration(tm)) + .forEach(tm -> extents.add(tm.getExtent())); + } + partitionedMigrations.put(dl, extents); + } + return partitionedMigrations; + } + + /** + * Given the current tserverStatus map and a DataLevel, return a view of the tserverStatus map + * that only contains entries for tables in the DataLevel + */ + private SortedMap createTServerStatusView( + final Ample.DataLevel dl, final SortedMap status) { + final SortedMap tserverStatusForLevel = new TreeMap<>(); + final String METADATA_TABLE_ID = SystemTables.METADATA.tableId().canonical(); + final String ROOT_TABLE_ID = SystemTables.ROOT.tableId().canonical(); + status.forEach((tsi, tss) -> { + final TabletServerStatus copy = tss.deepCopy(); + final Map oldTableMap = copy.getTableMap(); + final Map newTableMap = + new HashMap<>(dl == Ample.DataLevel.USER ? oldTableMap.size() : 1); + switch (dl) { + case ROOT: { + var tableInfo = oldTableMap.get(ROOT_TABLE_ID); + if (tableInfo != null) { + newTableMap.put(ROOT_TABLE_ID, tableInfo); + } + break; + } + case METADATA: { + var tableInfo = oldTableMap.get(METADATA_TABLE_ID); + if (tableInfo != null) { + newTableMap.put(METADATA_TABLE_ID, tableInfo); + } + break; + } + case USER: + if (!oldTableMap.containsKey(METADATA_TABLE_ID) + && !oldTableMap.containsKey(ROOT_TABLE_ID)) { + newTableMap.putAll(oldTableMap); + } else { + oldTableMap.forEach((table, info) -> { + if (!table.equals(ROOT_TABLE_ID) && !table.equals(METADATA_TABLE_ID)) { + newTableMap.put(table, info); + } + }); + } + break; + + default: + throw new IllegalArgumentException("Unhandled DataLevel value: " + dl); + } + copy.setTableMap(newTableMap); + tserverStatusForLevel.put(tsi, copy); + }); + return tserverStatusForLevel; + } + + private Map getTablesForLevel(Ample.DataLevel dataLevel) { + switch (dataLevel) { + case ROOT: + return Map.of(SystemTables.ROOT.tableName(), SystemTables.ROOT.tableId()); + case METADATA: + return Map.of(SystemTables.METADATA.tableName(), SystemTables.METADATA.tableId()); + case USER: { + Map userTables = getContext().createQualifiedTableNameToIdMap(); + for (var accumuloTable : SystemTables.values()) { + if (Ample.DataLevel.of(accumuloTable.tableId()) != Ample.DataLevel.USER) { + userTables.remove(accumuloTable.tableName()); + } + } + return Collections.unmodifiableMap(userTables); + } + default: + throw new IllegalArgumentException("Unknown data level " + dataLevel); + } + } + + private List checkMigrationSanity(Set current, + List migrations, Ample.DataLevel level) { + return migrations.stream().filter(m -> { + boolean includeMigration = false; + if (m.getTablet() == null) { + log.error("Balancer gave back a null tablet {}", m); + } else if (Ample.DataLevel.of(m.getTablet().getTable()) != level) { + log.warn( + "Balancer wants to move a tablet ({}) outside of the current processing level ({}), " + + "ignoring and should be processed at the correct level ({})", + m.getTablet(), level, Ample.DataLevel.of(m.getTablet().getTable())); + } else if (m.getNewTabletServer() == null) { + log.error("Balancer did not set the destination {}", m); + } else if (m.getOldTabletServer() == null) { + log.error("Balancer did not set the source {}", m); + } else if (!current.contains(m.getOldTabletServer())) { + log.warn("Balancer wants to move a tablet from a server that is not current: {}", m); + } else if (!current.contains(m.getNewTabletServer())) { + log.warn("Balancer wants to move a tablet to a server that is not current: {}", m); + } else { + includeMigration = true; + } + return includeMigration; + }).collect(Collectors.toList()); + } + + long balanceTablets() { + final int tabletsNotHosted = getManager().notHosted(); + BalanceParamsImpl params = null; + long wait = 0; + long totalMigrationsOut = 0; + final Map> partitionedMigrations = partitionMigrations(); + int levelsCompleted = 0; + + for (Ample.DataLevel dl : Ample.DataLevel.values()) { + if (dl == Ample.DataLevel.USER && tabletsNotHosted > 0) { + log.debug("not balancing user tablets because there are {} unhosted tablets", + tabletsNotHosted); + continue; + } + + if (dl == Ample.DataLevel.USER && !canAssignAndBalance()) { + log.debug("not balancing user tablets because not enough tablet servers"); + continue; + } + + if ((dl == Ample.DataLevel.METADATA || dl == Ample.DataLevel.USER) + && !partitionedMigrations.get(Ample.DataLevel.ROOT).isEmpty()) { + log.debug("Not balancing {} because {} has migrations", dl, Ample.DataLevel.ROOT); + continue; + } + + if (dl == Ample.DataLevel.USER + && !partitionedMigrations.get(Ample.DataLevel.METADATA).isEmpty()) { + log.debug("Not balancing {} because {} has migrations", dl, Ample.DataLevel.METADATA); + continue; + } + + // Create a view of the tserver status such that it only contains the tables + // for this level in the tableMap. + SortedMap tserverStatusForLevel = + createTServerStatusView(dl, getManager().tserverStatus); + // Construct the Thrift variant of the map above for the BalancerParams + final SortedMap tserverStatusForBalancerLevel = new TreeMap<>(); + tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel + .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); + + log.debug("Balancing for tables at level {}", dl); + + SortedMap statusForBalancerLevel = + tserverStatusForBalancerLevel; + params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, + getManager().tServerGroupingForBalancer, tserverStatusForLevel, + partitionedMigrations.get(dl), dl, getTablesForLevel(dl)); + wait = Math.max(getBalancer().balance(params), wait); + long migrationsOutForLevel = 0; + try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets(result -> {})) { + for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), + params.migrationsOut(), dl)) { + final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); + if (partitionedMigrations.get(dl).contains(ke)) { + log.warn("balancer requested migration more than once, skipping {}", m); + continue; + } + migrationsOutForLevel++; + var migration = TabletServerIdImpl.toThrift(m.getNewTabletServer()); + tabletsMutator.mutateTablet(ke).requireAbsentOperation() + .requireCurrentLocationNotEqualTo(migration).putMigration(migration) + .submit(tm -> false); + log.debug("migration {}", m); + } + } + totalMigrationsOut += migrationsOutForLevel; + + // increment this at end of loop to signal complete run w/o any continue + levelsCompleted++; + } + final long totalMigrations = + totalMigrationsOut + partitionedMigrations.values().stream().mapToLong(Set::size).sum(); + balancerMetrics.assignMigratingCount(() -> totalMigrations); + + if (totalMigrationsOut == 0 && levelsCompleted == Ample.DataLevel.values().length) { + synchronized (balancedNotifier) { + balancedNotifier.notifyAll(); + } + } else if (totalMigrationsOut > 0) { + getManager().nextEvent.event("Migrating %d more tablets, %d total", totalMigrationsOut, + totalMigrations); + } + return wait; + } + + @SuppressFBWarnings(value = "UW_UNCOND_WAIT", justification = "TODO needs triage") + void waitForBalance() { + synchronized (balancedNotifier) { + long eventCounter; + do { + eventCounter = getManager().nextEvent.waitForEvents(0, 0); + try { + balancedNotifier.wait(); + } catch (InterruptedException e) { + log.debug(e.toString(), e); + } + } while (getManager().displayUnassigned() > 0 || numMigrations() > 0 + || eventCounter != getManager().nextEvent.waitForEvents(0, 0)); + } + } + + long numMigrations() { + long count = 0; + for (Ample.DataLevel dl : Ample.DataLevel.values()) { + try (var tabletsMetadata = getContext().getAmple().readTablets().forLevel(dl) + .fetch(TabletMetadata.ColumnType.MIGRATION).filter(new HasMigrationFilter()).build()) { + count += tabletsMetadata.stream().count(); + } + } + return count; + } + + void getAssignments(SortedMap currentStatus, + Map> currentTServerGroups, + Map unassigned, Map assignedOut) { + AssignmentParamsImpl params = + AssignmentParamsImpl.fromThrift(currentStatus, currentTServerGroups, + unassigned.entrySet().stream().collect(HashMap::new, + (m, e) -> m.put(e.getKey(), + e.getValue().getLastLocation() == null ? null + : e.getValue().getLastLocation().getServerInstance()), + Map::putAll), + assignedOut); + getBalancer().getAssignments(params); + if (!canAssignAndBalance()) { + // remove assignment for user tables + Iterator iter = assignedOut.keySet().iterator(); + while (iter.hasNext()) { + KeyExtent ke = iter.next(); + if (!ke.isMeta()) { + iter.remove(); + log.trace("Removed assignment for {} as assignments for user tables is disabled.", ke); + } + } + } + } + + private boolean canAssignAndBalance() { + final int threshold = getManager().getConfiguration() + .getCount(Property.MANAGER_TABLET_BALANCER_TSERVER_THRESHOLD); + if (threshold == 0) { + return true; + } + final int numTServers = getManager().tserverSet.size(); + final boolean result = numTServers >= threshold; + if (!result) { + log.warn("Not assigning or balancing as number of tservers ({}) is below threshold ({})", + numTServers, threshold); + } + return result; + } + + private boolean shouldCleanupMigration(TabletMetadata tabletMetadata) { + var tableState = getContext().getTableManager().getTableState(tabletMetadata.getTableId()); + var migration = tabletMetadata.getMigration(); + Preconditions.checkState(migration != null, + "This method should only be called if there is a migration"); + return tableState == TableState.OFFLINE + || !getManager().onlineTabletServers().contains(migration) + || (tabletMetadata.getLocation() != null + && tabletMetadata.getLocation().getServerInstance().equals(migration)); + } + + void startMigrationCleanupThread() { + Threads.createCriticalThread("Migration Cleanup Thread", new MigrationCleanupThread()).start(); + } + + private class MigrationCleanupThread implements Runnable { + + @Override + public void run() { + while (getManager().stillManager()) { + try { + // - Remove any migrations for tablets of offline tables, as the migration can never + // succeed because no tablet server will load the tablet + // - Remove any migrations to tablet servers that are not live + // - Remove any migrations where the tablets current location equals the migration + // (the migration has completed) + var ample = getContext().getAmple(); + for (Ample.DataLevel dl : Ample.DataLevel.values()) { + // prev row needed for the extent + try (var tabletsMetadata = ample.readTablets().forLevel(dl) + .fetch(TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.MIGRATION, + TabletMetadata.ColumnType.LOCATION) + .filter(new HasMigrationFilter()).build(); + var tabletsMutator = ample.conditionallyMutateTablets(result -> {})) { + for (var tabletMetadata : tabletsMetadata) { + var migration = tabletMetadata.getMigration(); + if (shouldCleanupMigration(tabletMetadata)) { + tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation() + .requireMigration(migration).deleteMigration().submit(tm -> false); + } + } + } + } + } catch (Exception ex) { + log.error("Error cleaning up migrations", ex); + } + sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES); + } + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 28b435ef136..5504ae1efc2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -34,7 +34,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -44,7 +43,6 @@ import java.util.Optional; import java.util.Set; import java.util.SortedMap; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; @@ -56,7 +54,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; -import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; @@ -89,10 +86,6 @@ import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; -import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; -import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; -import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; -import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.BulkImportState; import org.apache.accumulo.core.manager.thrift.ManagerGoalState; @@ -103,16 +96,8 @@ import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.filters.HasMigrationFilter; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.spi.balancer.BalancerEnvironment; -import org.apache.accumulo.core.spi.balancer.DoNothingBalancer; -import org.apache.accumulo.core.spi.balancer.TabletBalancer; -import org.apache.accumulo.core.spi.balancer.data.TServerStatus; -import org.apache.accumulo.core.spi.balancer.data.TabletMigration; -import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Timer; @@ -122,7 +107,6 @@ import org.apache.accumulo.core.zookeeper.ZcStat; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.merge.FindMergeableRangeTask; -import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; import org.apache.accumulo.manager.split.Splitter; @@ -137,11 +121,9 @@ import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; -import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.accumulo.server.manager.state.DeadServerList; import org.apache.accumulo.server.manager.state.TabletServerState; import org.apache.accumulo.server.manager.state.TabletStateStore; -import org.apache.accumulo.server.manager.state.UnassignedTablet; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; @@ -167,7 +149,6 @@ import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.micrometer.core.instrument.MeterRegistry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -182,7 +163,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { static final Logger log = LoggerFactory.getLogger(Manager.class); static final int ONE_SECOND = 1000; - private static final long CLEANUP_INTERVAL_MINUTES = 5; + static final long CLEANUP_INTERVAL_MINUTES = 5; static final long WAIT_BETWEEN_ERRORS = ONE_SECOND; private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; @@ -191,7 +172,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { private static final int MAX_BAD_STATUS_COUNT = 3; private static final double MAX_SHUTDOWNS_PER_SEC = 10D / 60D; - private final Object balancedNotifier = new Object(); final LiveTServerSet tserverSet; private final List watchers = new ArrayList<>(); final Map badServers = @@ -208,9 +188,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { ServiceLock managerLock = null; private TServer clientService = null; - protected volatile TabletBalancer tabletBalancer = null; - private final BalancerEnvironment balancerEnvironment; - private final BalancerMetrics balancerMetrics = new BalancerMetrics(); + private final BalanceManager balanceManager; private ManagerState state = ManagerState.INITIAL; @@ -238,6 +216,10 @@ public synchronized ManagerState getManagerState() { return state; } + public BalanceManager getBalanceManager() { + return balanceManager; + } + public Map> getCompactionHints(DataLevel level) { Predicate tablePredicate = (tableId) -> DataLevel.of(tableId) == level; Map allConfig; @@ -377,7 +359,7 @@ private int nonMetaDataTabletsAssignedOrHosted() { - assignedOrHosted(SystemTables.ROOT.tableId()); } - private int notHosted() { + int notHosted() { int result = 0; for (TabletGroupWatcher watcher : watchers) { for (TableCounts counts : watcher.getStats().values()) { @@ -453,7 +435,7 @@ protected Manager(ConfigOpts opts, Function ser super(ServerId.Type.MANAGER, opts, serverContextFactory, args); ServerContext context = super.getContext(); upgradeCoordinator = new UpgradeCoordinator(context); - balancerEnvironment = new BalancerEnvironmentImpl(context); + balanceManager = new BalanceManager(); AccumuloConfiguration aconf = context.getConfiguration(); @@ -461,7 +443,6 @@ protected Manager(ConfigOpts opts, Function ser log.info("Instance {}", context.getInstanceID()); timeKeeper = new ManagerTime(this, aconf); tserverSet = new LiveTServerSet(context, this); - initializeBalancer(); final long tokenLifetime = aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME); @@ -522,10 +503,6 @@ public Splitter getSplitter() { return splitter; } - public MetricsProducer getBalancerMetrics() { - return balancerMetrics; - } - public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() { return upgradeCoordinator.getStatus(); } @@ -544,52 +521,6 @@ public void hostOndemand(List extents) { } } - private class MigrationCleanupThread implements Runnable { - - @Override - public void run() { - while (stillManager()) { - try { - // - Remove any migrations for tablets of offline tables, as the migration can never - // succeed because no tablet server will load the tablet - // - Remove any migrations to tablet servers that are not live - // - Remove any migrations where the tablets current location equals the migration - // (the migration has completed) - var ample = getContext().getAmple(); - for (DataLevel dl : DataLevel.values()) { - // prev row needed for the extent - try (var tabletsMetadata = ample.readTablets().forLevel(dl) - .fetch(TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.MIGRATION, - TabletMetadata.ColumnType.LOCATION) - .filter(new HasMigrationFilter()).build(); - var tabletsMutator = ample.conditionallyMutateTablets(result -> {})) { - for (var tabletMetadata : tabletsMetadata) { - var migration = tabletMetadata.getMigration(); - if (shouldCleanupMigration(tabletMetadata)) { - tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation() - .requireMigration(migration).deleteMigration().submit(tm -> false); - } - } - } - } - } catch (Exception ex) { - log.error("Error cleaning up migrations", ex); - } - sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES); - } - } - } - - private boolean shouldCleanupMigration(TabletMetadata tabletMetadata) { - var tableState = getContext().getTableManager().getTableState(tabletMetadata.getTableId()); - var migration = tabletMetadata.getMigration(); - Preconditions.checkState(migration != null, - "This method should only be called if there is a migration"); - return tableState == TableState.OFFLINE || !onlineTabletServers().contains(migration) - || (tabletMetadata.getLocation() != null - && tabletMetadata.getLocation().getServerInstance().equals(migration)); - } - private class ScanServerZKCleaner implements Runnable { @Override @@ -632,28 +563,6 @@ public void run() { } - /** - * balanceTablets() balances tables by DataLevel. Return the current set of migrations partitioned - * by DataLevel - */ - private Map> partitionMigrations() { - final Map> partitionedMigrations = new EnumMap<>(DataLevel.class); - for (DataLevel dl : DataLevel.values()) { - Set extents = new HashSet<>(); - // prev row needed for the extent - try (var tabletsMetadata = getContext() - .getAmple().readTablets().forLevel(dl).fetch(TabletMetadata.ColumnType.PREV_ROW, - TabletMetadata.ColumnType.MIGRATION, TabletMetadata.ColumnType.LOCATION) - .filter(new HasMigrationFilter()).build()) { - // filter out migrations that are awaiting cleanup - tabletsMetadata.stream().filter(tm -> !shouldCleanupMigration(tm)) - .forEach(tm -> extents.add(tm.getExtent())); - } - partitionedMigrations.put(dl, extents); - } - return partitionedMigrations; - } - private class StatusThread implements Runnable { private boolean goodStats() { @@ -798,7 +707,7 @@ private long updateStatus() { return DEFAULT_WAIT_FOR_WATCHER; } } - return balanceTablets(); + return balanceManager.balanceTablets(); } return DEFAULT_WAIT_FOR_WATCHER; } @@ -830,186 +739,6 @@ private void checkForHeldServer(SortedMap ts badServers.putIfAbsent(instance, new AtomicInteger(1)); } } - - /** - * Given the current tserverStatus map and a DataLevel, return a view of the tserverStatus map - * that only contains entries for tables in the DataLevel - */ - private SortedMap createTServerStatusView( - final DataLevel dl, final SortedMap status) { - final SortedMap tserverStatusForLevel = new TreeMap<>(); - status.forEach((tsi, tss) -> { - final TabletServerStatus copy = tss.deepCopy(); - final Map oldTableMap = copy.getTableMap(); - final Map newTableMap = - new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1); - if (dl == DataLevel.ROOT) { - if (oldTableMap.containsKey(SystemTables.ROOT.tableId().canonical())) { - newTableMap.put(SystemTables.ROOT.tableId().canonical(), - oldTableMap.get(SystemTables.ROOT.tableId().canonical())); - } - } else if (dl == DataLevel.METADATA) { - if (oldTableMap.containsKey(SystemTables.METADATA.tableId().canonical())) { - newTableMap.put(SystemTables.METADATA.tableId().canonical(), - oldTableMap.get(SystemTables.METADATA.tableId().canonical())); - } - } else if (dl == DataLevel.USER) { - if (!oldTableMap.containsKey(SystemTables.METADATA.tableId().canonical()) - && !oldTableMap.containsKey(SystemTables.ROOT.tableId().canonical())) { - newTableMap.putAll(oldTableMap); - } else { - oldTableMap.forEach((table, info) -> { - if (!table.equals(SystemTables.ROOT.tableId().canonical()) - && !table.equals(SystemTables.METADATA.tableId().canonical())) { - newTableMap.put(table, info); - } - }); - } - } else { - throw new IllegalArgumentException("Unhandled DataLevel value: " + dl); - } - copy.setTableMap(newTableMap); - tserverStatusForLevel.put(tsi, copy); - }); - return tserverStatusForLevel; - } - - private Map getTablesForLevel(DataLevel dataLevel) { - switch (dataLevel) { - case ROOT: - return Map.of(SystemTables.ROOT.tableName(), SystemTables.ROOT.tableId()); - case METADATA: - return Map.of(SystemTables.METADATA.tableName(), SystemTables.METADATA.tableId()); - case USER: { - Map userTables = getContext().createQualifiedTableNameToIdMap(); - for (var accumuloTable : SystemTables.values()) { - if (DataLevel.of(accumuloTable.tableId()) != DataLevel.USER) { - userTables.remove(accumuloTable.tableName()); - } - } - return Collections.unmodifiableMap(userTables); - } - default: - throw new IllegalArgumentException("Unknown data level " + dataLevel); - } - } - - private long balanceTablets() { - - // Check for balancer property change - initializeBalancer(); - - final int tabletsNotHosted = notHosted(); - BalanceParamsImpl params = null; - long wait = 0; - long totalMigrationsOut = 0; - final Map> partitionedMigrations = partitionMigrations(); - int levelsCompleted = 0; - - for (DataLevel dl : DataLevel.values()) { - - if (dl == DataLevel.USER && tabletsNotHosted > 0) { - log.debug("not balancing user tablets because there are {} unhosted tablets", - tabletsNotHosted); - continue; - } - - if (dl == DataLevel.USER && !canAssignAndBalance()) { - log.debug("not balancing user tablets because not enough tablet servers"); - continue; - } - - if ((dl == DataLevel.METADATA || dl == DataLevel.USER) - && !partitionedMigrations.get(DataLevel.ROOT).isEmpty()) { - log.debug("Not balancing {} because {} has migrations", dl, DataLevel.ROOT); - continue; - } - - if (dl == DataLevel.USER && !partitionedMigrations.get(DataLevel.METADATA).isEmpty()) { - log.debug("Not balancing {} because {} has migrations", dl, DataLevel.METADATA); - continue; - } - - // Create a view of the tserver status such that it only contains the tables - // for this level in the tableMap. - SortedMap tserverStatusForLevel = - createTServerStatusView(dl, tserverStatus); - // Construct the Thrift variant of the map above for the BalancerParams - final SortedMap tserverStatusForBalancerLevel = - new TreeMap<>(); - tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel - .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); - - log.debug("Balancing for tables at level {}", dl); - - SortedMap statusForBalancerLevel = - tserverStatusForBalancerLevel; - params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, tServerGroupingForBalancer, - tserverStatusForLevel, partitionedMigrations.get(dl), dl, getTablesForLevel(dl)); - wait = Math.max(tabletBalancer.balance(params), wait); - long migrationsOutForLevel = 0; - try ( - var tabletsMutator = getContext().getAmple().conditionallyMutateTablets(result -> {})) { - for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), - params.migrationsOut(), dl)) { - final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); - if (partitionedMigrations.get(dl).contains(ke)) { - log.warn("balancer requested migration more than once, skipping {}", m); - continue; - } - migrationsOutForLevel++; - var migration = TabletServerIdImpl.toThrift(m.getNewTabletServer()); - tabletsMutator.mutateTablet(ke).requireAbsentOperation() - .requireCurrentLocationNotEqualTo(migration).putMigration(migration) - .submit(tm -> false); - log.debug("migration {}", m); - } - } - totalMigrationsOut += migrationsOutForLevel; - - // increment this at end of loop to signal complete run w/o any continue - levelsCompleted++; - } - final long totalMigrations = - totalMigrationsOut + partitionedMigrations.values().stream().mapToLong(Set::size).sum(); - balancerMetrics.assignMigratingCount(() -> totalMigrations); - - if (totalMigrationsOut == 0 && levelsCompleted == DataLevel.values().length) { - synchronized (balancedNotifier) { - balancedNotifier.notifyAll(); - } - } else if (totalMigrationsOut > 0) { - nextEvent.event("Migrating %d more tablets, %d total", totalMigrationsOut, totalMigrations); - } - return wait; - } - - private List checkMigrationSanity(Set current, - List migrations, DataLevel level) { - return migrations.stream().filter(m -> { - boolean includeMigration = false; - if (m.getTablet() == null) { - log.error("Balancer gave back a null tablet {}", m); - } else if (DataLevel.of(m.getTablet().getTable()) != level) { - log.warn( - "Balancer wants to move a tablet ({}) outside of the current processing level ({}), " - + "ignoring and should be processed at the correct level ({})", - m.getTablet(), level, DataLevel.of(m.getTablet().getTable())); - } else if (m.getNewTabletServer() == null) { - log.error("Balancer did not set the destination {}", m); - } else if (m.getOldTabletServer() == null) { - log.error("Balancer did not set the source {}", m); - } else if (!current.contains(m.getOldTabletServer())) { - log.warn("Balancer wants to move a tablet from a server that is not current: {}", m); - } else if (!current.contains(m.getNewTabletServer())) { - log.warn("Balancer wants to move a tablet to a server that is not current: {}", m); - } else { - includeMigration = true; - } - return includeMigration; - }).collect(Collectors.toList()); - } - } private SortedMap @@ -1115,6 +844,8 @@ private List checkMigrationSanity(Set current, public void run() { final ServerContext context = getContext(); + balanceManager.setManager(this); + // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health // when a hot-standby // @@ -1203,7 +934,7 @@ public void process(WatchedEvent event) { MetricsInfo metricsInfo = getContext().getMetricsInfo(); ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), this); var producers = managerMetrics.getProducers(getConfiguration(), this); - producers.add(balancerMetrics); + producers.add(balanceManager.getMetrics()); final TabletGroupWatcher userTableTGW = new TabletGroupWatcher(this, this.userTabletStore, null, managerMetrics) { @@ -1319,7 +1050,7 @@ boolean canSuspendTablets() { metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), sa.getAddress(), getResourceGroup())); - Threads.createCriticalThread("Migration Cleanup Thread", new MigrationCleanupThread()).start(); + balanceManager.startMigrationCleanupThread(); Threads.createCriticalThread("ScanServer Cleanup Thread", new ScanServerZKCleaner()).start(); // Don't call start the CompactionCoordinator until we have tservers and upgrade is complete. @@ -1742,22 +1473,6 @@ public void assignedTablet(KeyExtent extent) { } } - @SuppressFBWarnings(value = "UW_UNCOND_WAIT", justification = "TODO needs triage") - public void waitForBalance() { - synchronized (balancedNotifier) { - long eventCounter; - do { - eventCounter = nextEvent.waitForEvents(0, 0); - try { - balancedNotifier.wait(); - } catch (InterruptedException e) { - log.debug(e.toString(), e); - } - } while (displayUnassigned() > 0 || numMigrations() > 0 - || eventCounter != nextEvent.waitForEvents(0, 0)); - } - } - public ManagerMonitorInfo getManagerMonitorInfo() { final ManagerMonitorInfo result = new ManagerMonitorInfo(); @@ -1826,67 +1541,6 @@ public boolean isUpgrading() { return upgradeCoordinator.getStatus() != UpgradeCoordinator.UpgradeStatus.COMPLETE; } - private void initializeBalancer() { - String configuredBalancerClass = getConfiguration().get(Property.MANAGER_TABLET_BALANCER); - try { - if (tabletBalancer == null - || !tabletBalancer.getClass().getName().equals(configuredBalancerClass)) { - log.debug("Attempting to initialize balancer using class {}, was {}", - configuredBalancerClass, - tabletBalancer == null ? "null" : tabletBalancer.getClass().getName()); - var localTabletBalancer = Property.createInstanceFromPropertyName(getConfiguration(), - Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new DoNothingBalancer()); - localTabletBalancer.init(balancerEnvironment); - tabletBalancer = localTabletBalancer; - log.info("tablet balancer changed to {}", localTabletBalancer.getClass().getName()); - } - } catch (Exception e) { - log.warn("Failed to create balancer {} using {} instead", configuredBalancerClass, - DoNothingBalancer.class, e); - var localTabletBalancer = new DoNothingBalancer(); - localTabletBalancer.init(balancerEnvironment); - tabletBalancer = localTabletBalancer; - } - } - - void getAssignments(SortedMap currentStatus, - Map> currentTServerGroups, - Map unassigned, Map assignedOut) { - AssignmentParamsImpl params = - AssignmentParamsImpl.fromThrift(currentStatus, currentTServerGroups, - unassigned.entrySet().stream().collect(HashMap::new, - (m, e) -> m.put(e.getKey(), - e.getValue().getLastLocation() == null ? null - : e.getValue().getLastLocation().getServerInstance()), - Map::putAll), - assignedOut); - tabletBalancer.getAssignments(params); - if (!canAssignAndBalance()) { - // remove assignment for user tables - Iterator iter = assignedOut.keySet().iterator(); - while (iter.hasNext()) { - KeyExtent ke = iter.next(); - if (!ke.isMeta()) { - iter.remove(); - log.trace("Removed assignment for {} as assignments for user tables is disabled.", ke); - } - } - } - } - - public TabletStateStore getTabletStateStore(DataLevel level) { - switch (level) { - case METADATA: - return this.metadataTabletStore; - case ROOT: - return this.rootTabletStore; - case USER: - return this.userTabletStore; - default: - throw new IllegalStateException("Unhandled DataLevel value: " + level); - } - } - @Override public void registerMetrics(MeterRegistry registry) { super.registerMetrics(registry); @@ -1903,30 +1557,4 @@ private Map> getFateRefs() { public ServiceLock getLock() { return managerLock; } - - private long numMigrations() { - long count = 0; - for (DataLevel dl : DataLevel.values()) { - try (var tabletsMetadata = getContext().getAmple().readTablets().forLevel(dl) - .fetch(TabletMetadata.ColumnType.MIGRATION).filter(new HasMigrationFilter()).build()) { - count += tabletsMetadata.stream().count(); - } - } - return count; - } - - private boolean canAssignAndBalance() { - final int threshold = - getConfiguration().getCount(Property.MANAGER_TABLET_BALANCER_TSERVER_THRESHOLD); - if (threshold == 0) { - return true; - } - final int numTServers = tserverSet.size(); - final boolean result = numTServers >= threshold; - if (!result) { - log.warn("Not assigning or balancing as number of tservers ({}) is below threshold ({})", - numTServers, threshold); - } - return result; - } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index 6c8ff0ef3fe..2be39fdfcf7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -549,7 +549,7 @@ private void alterTableProperty(TCredentials c, String tableName, String propert @Override public void waitForBalance(TInfo tinfo) { - manager.waitForBalance(); + manager.getBalanceManager().waitForBalance(); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 4481ec28fd9..b1906cd37f5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -532,8 +532,8 @@ private TableMgmtStats manageTablets(Iterator iter, // This is final because nothing in this method should change the goal. All computation of the // goal should be done in TabletGoalState.compute() so that all parts of the Accumulo code // will compute a consistent goal. - final TabletGoalState goal = - TabletGoalState.compute(tm, state, manager.tabletBalancer, tableMgmtParams); + final TabletGoalState goal = TabletGoalState.compute(tm, state, + manager.getBalanceManager().getBalancer(), tableMgmtParams); final Set actions = mti.getActions(); @@ -968,8 +968,8 @@ private void getAssignmentsFromBalancer(TabletLists tLists, Map unassigned) { if (!tLists.destinations.isEmpty()) { Map assignedOut = new HashMap<>(); - manager.getAssignments(tLists.destinations, tLists.currentTServerGrouping, unassigned, - assignedOut); + manager.getBalanceManager().getAssignments(tLists.destinations, tLists.currentTServerGrouping, + unassigned, assignedOut); for (Entry assignment : assignedOut.entrySet()) { if (unassigned.containsKey(assignment.getKey())) { if (assignment.getValue() != null) {