Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.accumulo.core.clientImpl;

import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
Expand Down Expand Up @@ -100,10 +99,4 @@ static ClientInfo from(Properties properties, AuthenticationToken token) {
return new ClientInfoImpl(properties, Optional.of(token));
}

/**
* @return ClientInfo given path to client config file
*/
static ClientInfo from(Path propertiesFile) {
return new ClientInfoImpl(ClientInfoImpl.toProperties(propertiesFile), Optional.empty());
}
}
13 changes: 0 additions & 13 deletions core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,26 @@ public class FateKey {
private final FateKeyType type;
private final Optional<KeyExtent> keyExtent;
private final Optional<ExternalCompactionId> compactionId;
private final Optional<TServerInstance> tServerInstance;
private final byte[] serialized;

private FateKey(FateKeyType type, KeyExtent keyExtent) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.of(keyExtent);
this.compactionId = Optional.empty();
this.tServerInstance = Optional.empty();
this.serialized = serialize(type, keyExtent);
}

private FateKey(FateKeyType type, ExternalCompactionId compactionId) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.empty();
this.compactionId = Optional.of(compactionId);
this.tServerInstance = Optional.empty();
this.serialized = serialize(type, compactionId);
}

private FateKey(FateKeyType type, TServerInstance tServerInstance) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.empty();
this.compactionId = Optional.empty();
this.tServerInstance = Optional.of(tServerInstance);
this.serialized = serialize(type, tServerInstance);
}

Expand All @@ -75,7 +71,6 @@ private FateKey(byte[] serialized) {
this.type = FateKeyType.valueOf(buffer.readUTF());
this.keyExtent = deserializeKeyExtent(type, buffer);
this.compactionId = deserializeCompactionId(type, buffer);
this.tServerInstance = deserializeTserverId(type, buffer);
this.serialized = serialized;
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down Expand Up @@ -200,14 +195,6 @@ private static Optional<ExternalCompactionId> deserializeCompactionId(FateKeyTyp
};
}

private static Optional<TServerInstance> deserializeTserverId(FateKeyType type,
DataInputBuffer buffer) throws IOException {
return switch (type) {
case SPLIT, MERGE, COMPACTION_COMMIT -> Optional.empty();
case TSERVER_SHUTDOWN -> Optional.of(new TServerInstance(buffer.readUTF()));
};
}

Comment on lines -203 to -210
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keith-turner I think the tServerInstance field became unused after recent distributed Fate changes, but there remains a FateKey constructor that accepts a TServerInstance that is serialized, but then it is never deserialized. I think there's probably some unnecessary stuff going on here, even after my changes, because it doesn't seem useful to have a code path that serializes a TServerInstance when it will never be used.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code was added in #6224. That code is used to ensure there is only a single fate operations to shutdown a specific tserver running at time. I was just following the pattern of the other code when adding that new fate key type. However it usage differs, we never need to read it. For compaction fate keys those need to be read for dead compaction detection. So since we never read it, it did not need to follow the pattern of the other types. Seems ok to remove it and the parts left behind are still in use.

This code uses the fate key to ensure only one operation is initiated per a tserver. The serialized data is used for the FateKey hashcode and equals functions, so that seems like it will still work for comparing them to dedupe.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In persistent storage the serialized from of the key is used to dedupe by putting it in the Accumulo key for the fate table.

@Override
public String toString() {
var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.metrics.Metric.MANAGER_GOAL_STATE;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.FILE_RENAME_POOL;

import java.io.IOException;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -281,7 +280,6 @@ void setTserverStatus(LiveTServersSnapshot snapshot,
private final TabletStateStore rootTabletStore;
private final TabletStateStore metadataTabletStore;
private final TabletStateStore userTabletStore;
private final ExecutorService renamePool;

public synchronized ManagerState getManagerState() {
return state;
Expand Down Expand Up @@ -488,9 +486,6 @@ protected Manager(ServerOpts opts,
BiFunction<SiteConfiguration,ResourceGroupId,ServerContext> serverContextFactory,
String[] args) throws IOException {
super(ServerId.Type.MANAGER, opts, serverContextFactory, args);
int poolSize = this.getConfiguration().getCount(Property.MANAGER_RENAME_THREADS);
renamePool = ThreadPools.getServerThreadPools().getPoolBuilder(FILE_RENAME_POOL.poolName)
.numCoreThreads(poolSize).build();
ServerContext context = super.getContext();
upgradeCoordinator = new UpgradeCoordinator(context);
balanceManager = new BalanceManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ static FailureCounts incrementSuccess(Object key, FailureCounts counts) {

private final LoadingCache<ResourceGroupId,Integer> compactorCounts;

private volatile long coordinatorStartTime;

private final Map<DataLevel,ThreadPoolExecutor> reservationPools;
private final Set<String> activeCompactorReservationRequest = ConcurrentHashMap.newKeySet();

Expand Down Expand Up @@ -301,8 +299,6 @@ private void checkForConfigChanges() {

@Override
public void run() {

this.coordinatorStartTime = System.currentTimeMillis();
startConfigMonitor(ctx.getScheduledExecutor());
startCompactorZKCleaner(ctx.getScheduledExecutor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {

// For user initiated table compactions, the fate operation will refresh tablets. Can also
// refresh as part of this compaction commit as it may run sooner.
return new RefreshTablet(commitData.ecid, commitData.textent, refreshLocation);
return new RefreshTablet(commitData.textent, refreshLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ public class RefreshTablet extends AbstractFateOperation {
private static final long serialVersionUID = 1L;
private final TKeyExtent extent;
private final String tserverInstance;
private final String compactionId;

public RefreshTablet(String ecid, TKeyExtent extent, String tserverInstance) {
this.compactionId = ecid;
public RefreshTablet(TKeyExtent extent, String tserverInstance) {
this.extent = extent;
this.tserverInstance = tserverInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public synchronized void stop(FateInstanceType fateType, Duration timeout) {
}

if (!timer.isExpired()) {
FateStore<FateEnv> store = switch (fateType) {
try (FateStore<FateEnv> store = switch (fateType) {
case USER -> new UserFateStore<FateEnv>(context, SystemTables.FATE.tableName(), null, null);
case META -> {
try {
Expand All @@ -224,18 +224,20 @@ public synchronized void stop(FateInstanceType fateType, Duration timeout) {
throw new IllegalStateException(e);
}
}
};
var reserved = store.getActiveReservations(Set.of(FatePartition.all(FateInstanceType.USER)));
while (!reserved.isEmpty() && !timer.isExpired()) {
if (log.isTraceEnabled()) {
reserved.forEach((fateId, reservation) -> {
log.trace("In stop(), waiting on {} {} ", fateId, reservation);
});
}
try {
Thread.sleep(Math.min(100, timer.timeLeft(TimeUnit.MILLISECONDS)));
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}) {
var reserved =
store.getActiveReservations(Set.of(FatePartition.all(FateInstanceType.USER)));
while (!reserved.isEmpty() && !timer.isExpired()) {
if (log.isTraceEnabled()) {
reserved.forEach((fateId, reservation) -> {
log.trace("In stop(), waiting on {} {} ", fateId, reservation);
});
}
try {
Thread.sleep(Math.min(100, timer.timeLeft(TimeUnit.MILLISECONDS)));
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -96,13 +95,8 @@ public MetricsInfo getMockMetrics() {

public class TestCoordinator extends CompactionCoordinator {

private final List<TExternalCompaction> runningCompactions;

private Set<ExternalCompactionId> metadataCompactionIds = null;

public TestCoordinator(Manager manager, List<TExternalCompaction> runningCompactions) {
super(manager, t -> null);
this.runningCompactions = runningCompactions;
}

@Override
Expand Down Expand Up @@ -139,14 +133,6 @@ public void compactionFailed(TInfo tinfo, TCredentials credentials, String exter
TKeyExtent extent, String exceptionClassName, TCompactionState failureState,
String groupName, String compactorAddress) throws ThriftSecurityException {}

void setMetadataCompactionIds(Set<ExternalCompactionId> mci) {
metadataCompactionIds = mci;
}

public void resetInternals() {
metadataCompactionIds = null;
}

@Override
protected CompactionMetadata reserveCompaction(ResolvedCompactionJob rcJob,
String compactorAddress, ExternalCompactionId externalCompactionId) {
Expand Down
3 changes: 1 addition & 2 deletions test/src/main/java/org/apache/accumulo/test/LocatorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.RowRange;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -194,7 +193,7 @@ public void testClearingUnused() throws Exception {
tableOps.create(table3, new NewTableConfiguration().createOffline());
tableOps.create(table4, new NewTableConfiguration().createOffline());

ClientContext ctx = (ClientContext) client;
var ctx = getCluster().getServerContext();

ctx.setClearFrequency(Duration.ofMillis(100));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.ShellOptionsJC;
import org.jline.reader.LineReader;
Expand Down Expand Up @@ -63,7 +62,6 @@ public void execute(JCommander cl, ShellOptionsJC options) throws Exception {
public Terminal terminal;

MockShell(File configFile) throws Exception {
ClientInfo info = ClientInfo.from(configFile.toPath());
// start the shell
output = new TestOutputStream();
input = new StringInputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.harness.AccumuloClusterHarness;
Expand Down Expand Up @@ -76,20 +75,18 @@ public void resetProperty() throws Exception {
public void experimentalPropTest() throws Exception {
// ensure experimental props do not show up in config output unless set

AuthenticationToken token = getAdminToken();
File clientPropsFile = null;
switch (getClusterType()) {
case MINI:
case MINI -> {
MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster();
clientPropsFile = mac.getConfig().getClientPropsFile();
break;
case STANDALONE:
}
case STANDALONE -> {
StandaloneAccumuloClusterConfiguration standaloneConf =
(StandaloneAccumuloClusterConfiguration) getClusterConfiguration();
clientPropsFile = standaloneConf.getClientPropsFile();
break;
default:
fail("Unknown cluster type");
}
default -> fail("Unknown cluster type");
}

assertNotNull(clientPropsFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,15 @@ public static void main(String[] args) throws Exception {
}
var traceId1 = span.getSpanContext().getTraceId();

// These scans are done to ensure cacne is populated. Caffeine can evict things that were only
// These scans are done to ensure cache is populated. Caffeine can evict things that were only
// used once.
for (int i = 0; i < opts.getUntracedIntermediateScans(); i++) {
try (var scanner = client.createScanner(table)) {
opts.conigureScanner(scanner);
scanner.setBatchSize(10_000);
var ignored = scanner.stream().count();
scanner.forEach((k, v) -> {
// do nothing with the result, just ensure the cache is populated
});
}
}

Expand Down