Skip to content

Add instance name to HDFS #5458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public class Constants {
public static final String VERSION_DIR = "version";
public static final String APPNAME = "org.apache.accumulo";

// important directories
public static final String INSTANCE_ID_DIR = "instance_id";
public static final String INSTANCE_DIR = "instance";
public static final String INSTANCE_ID_PREFIX = "id_";
public static final String INSTANCE_NAME_PREFIX = "name_";
public static final String TABLE_DIR = "tables";
public static final String RECOVERY_DIR = "recovery";
public static final String WAL_DIR = "wal";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.dataImpl.InstanceInfo;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

Expand All @@ -37,6 +38,11 @@
*/
public interface ClientInfo {

/**
* @return Accumulo instance info
*/
InstanceInfo getInstanceInfo();

/**
* @return Accumulo instance name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.dataImpl.InstanceInfo;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -51,6 +52,7 @@ public class ClientInfoImpl implements ClientInfo {
// suppliers for lazily loading
private final Supplier<AuthenticationToken> tokenSupplier;
private final Supplier<Configuration> hadoopConf;
private final Supplier<InstanceInfo> instanceInfo;
private final Supplier<InstanceId> instanceId;
private final BiFunction<String,String,ZooSession> zooSessionForName;

Expand All @@ -62,6 +64,7 @@ public ClientInfoImpl(Properties properties, Optional<AuthenticationToken> token
this.hadoopConf = memoize(Configuration::new);
this.zooSessionForName = (name, rootPath) -> new ZooSession(name, getZooKeepers() + rootPath,
getZooKeepersSessionTimeOut(), null);
this.instanceInfo = memoize(() -> new InstanceInfo(getInstanceName(), getInstanceId()));
this.instanceId = memoize(() -> {
try (var zk =
getZooKeeperSupplier(getClass().getSimpleName() + ".getInstanceId()", "").get()) {
Expand All @@ -70,6 +73,11 @@ public ClientInfoImpl(Properties properties, Optional<AuthenticationToken> token
});
}

@Override
public InstanceInfo getInstanceInfo() {
return instanceInfo.get();
}

@Override
public String getInstanceName() {
return getString(ClientProperty.INSTANCE_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.dataImpl;

import static java.util.Objects.requireNonNull;

import java.util.Objects;

import org.apache.accumulo.core.data.InstanceId;

public class InstanceInfo {

private final String name;
private final InstanceId id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a comment elsewhere about just serializing and deserializing this object using a single file in HDFS in a known location. If we do that, then I think we can collapse the instance version into this object as well and get rid of that file. It may also make sense to add a version number to this object to handle changes over time, much like a serialVersionUid.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the goal to minimize the amount of cluster RPC requests needed to read the contents of the file, and only require talking to the HDFS NameNode, the design is to keep the info in the file names. However, I do think we could move the version into this instance directory with a different prefix, and also add it to this object.


public InstanceInfo(String name, InstanceId id) {
this.name = requireNonNull(name);
this.id = requireNonNull(id);
}

public String getInstanceName() {
return name;
}

public InstanceId getInstanceId() {
return id;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof InstanceInfo) {
var o = (InstanceInfo) obj;
return Objects.equals(getInstanceName(), o.getInstanceName())
&& Objects.equals(getInstanceId(), o.getInstanceId());
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(getInstanceName(), getInstanceId());
}

@Override
public String toString() {
return getClass().getSimpleName() + "[name=" + getInstanceName() + ";id=" + getInstanceId()
+ "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.InstanceInfo;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metrics.MetricsInfo;
Expand Down Expand Up @@ -137,9 +137,8 @@ private ServerContext(ServerInfo info) {
/**
* Used during initialization to set the instance name and ID.
*/
public static ServerContext initialize(SiteConfiguration siteConfig, String instanceName,
InstanceId instanceID) {
return new ServerContext(ServerInfo.initialize(siteConfig, instanceName, instanceID));
public static ServerContext initialize(SiteConfiguration siteConfig, InstanceInfo instanceInfo) {
return new ServerContext(ServerInfo.initialize(siteConfig, instanceInfo));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.dataImpl.InstanceInfo;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.volume.VolumeConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
Expand Down Expand Up @@ -77,17 +77,17 @@ public Set<String> checkBaseUris(Configuration hadoopConf, Set<String> configure
// all base dirs must have same instance id and data version, any dirs that have neither should
// be ignored
String firstDir = null;
InstanceId firstIid = null;
InstanceInfo firstInfo = null;
Integer firstVersion = null;
// preserve order from configuration (to match user expectations a bit when volumes get sent to
// user-implemented VolumeChoosers)
LinkedHashSet<String> baseDirsList = new LinkedHashSet<>();
for (String baseDir : configuredBaseDirs) {
Path path = new Path(baseDir, Constants.INSTANCE_ID_DIR);
InstanceId currentIid;
Path path = new Path(baseDir, Constants.INSTANCE_DIR);
InstanceInfo currentInstanceInfo;
int currentVersion;
try {
currentIid = VolumeManager.getInstanceIDFromHdfs(path, hadoopConf);
currentInstanceInfo = VolumeManager.getInstanceInfoFromHdfs(path, hadoopConf);
Path vpath = new Path(baseDir, Constants.VERSION_DIR);
currentVersion = getAccumuloPersistentVersion(vpath.getFileSystem(hadoopConf), vpath);
} catch (Exception e) {
Expand All @@ -98,14 +98,14 @@ public Set<String> checkBaseUris(Configuration hadoopConf, Set<String> configure
}
}

if (firstIid == null) {
firstIid = currentIid;
if (firstInfo == null) {
firstInfo = currentInstanceInfo;
firstDir = baseDir;
firstVersion = currentVersion;
} else if (!currentIid.equals(firstIid)) {
} else if (!currentInstanceInfo.equals(firstInfo)) {
throw new IllegalArgumentException("Configuration " + Property.INSTANCE_VOLUMES.getKey()
+ " contains paths that have different instance ids " + baseDir + " has " + currentIid
+ " and " + firstDir + " has " + firstIid);
+ " contains paths that have different instance ids " + baseDir + " has "
+ currentInstanceInfo + " and " + firstDir + " has " + firstInfo);
} else if (currentVersion != firstVersion) {
throw new IllegalArgumentException("Configuration " + Property.INSTANCE_VOLUMES.getKey()
+ " contains paths that have different versions " + baseDir + " has " + currentVersion
Expand Down Expand Up @@ -238,8 +238,8 @@ public int getAccumuloPersistentVersion(FileSystem fs, Path path) {
}
}

public Path getInstanceIdLocation(Volume v) {
public Path getInstanceInfoLocation(Volume v) {
// all base dirs should have the same instance id, so can choose any one
return v.prefixChild(Constants.INSTANCE_ID_DIR);
return v.prefixChild(Constants.INSTANCE_DIR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.dataImpl.InstanceInfo;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
Expand All @@ -54,38 +54,28 @@ public class ServerInfo implements ClientInfo {

// set things up using the config file, the instanceId from HDFS, and ZK for the instanceName
static ServerInfo fromServerConfig(SiteConfiguration siteConfig) {
final Function<ServerInfo,String> instanceNameFromZk = si -> {
try (var zk =
si.getZooKeeperSupplier(ServerInfo.class.getSimpleName() + ".getInstanceName()", "")
.get()) {
return ZooUtil.getInstanceName(zk, si.getInstanceId());
}
};
final Function<ServerInfo,
InstanceId> instanceIdFromHdfs = si -> VolumeManager.getInstanceIDFromHdfs(
si.getServerDirs().getInstanceIdLocation(si.getVolumeManager().getFirst()),
InstanceInfo> instanceInfoFromHdfs = si -> VolumeManager.getInstanceInfoFromHdfs(
si.getServerDirs().getInstanceInfoLocation(si.getVolumeManager().getFirst()),
si.getHadoopConf());
return new ServerInfo(siteConfig, GET_ZK_HOSTS_FROM_CONFIG, GET_ZK_TIMEOUT_FROM_CONFIG,
instanceNameFromZk, instanceIdFromHdfs);
instanceInfoFromHdfs);
}

// set things up using a provided instanceName and InstanceId to initialize the system, but still
// have a ServerContext that is functional without bootstrapping issues, so long as you don't call
// functions from it that require an instance to have already been initialized
static ServerInfo initialize(SiteConfiguration siteConfig, String instanceName,
InstanceId instanceId) {
requireNonNull(instanceName);
requireNonNull(instanceId);
static ServerInfo initialize(SiteConfiguration siteConfig, InstanceInfo instanceInfo) {
requireNonNull(instanceInfo);
return new ServerInfo(siteConfig, GET_ZK_HOSTS_FROM_CONFIG, GET_ZK_TIMEOUT_FROM_CONFIG,
si -> instanceName, si -> instanceId);
si -> instanceInfo);
}

// set things up using the config file, and the client config for a server-side CLI utility
static ServerInfo fromServerAndClientConfig(SiteConfiguration siteConfig, ClientInfo info) {
// ClientInfo.getInstanceId looks up the ID in ZK using the provided instance name
return new ServerInfo(siteConfig, si -> info.getZooKeepers(),
si -> info.getZooKeepersSessionTimeOut(), si -> info.getInstanceName(),
si -> info.getInstanceId());
si -> info.getZooKeepersSessionTimeOut(), si -> info.getInstanceInfo());
}

static ServerInfo forTesting(SiteConfiguration siteConfig, String instanceName, String zooKeepers,
Expand All @@ -106,8 +96,7 @@ static ServerInfo forTesting(SiteConfiguration siteConfig, String instanceName,
private final Supplier<ServerDirs> serverDirs;
private final Supplier<String> zooKeepers;
private final Supplier<Integer> zooKeepersSessionTimeOut; // can't memoize IntSupplier
private final Supplier<InstanceId> instanceId;
private final Supplier<String> instanceName;
private final Supplier<InstanceInfo> instanceInfo;
private final Supplier<Credentials> credentials;
private final BiFunction<String,String,ZooSession> zooSessionForName;

Expand All @@ -118,13 +107,12 @@ static ServerInfo forTesting(SiteConfiguration siteConfig, String instanceName,
// another, but because things are lazily loaded, it is okay if one depends on another in one
// direction only
private ServerInfo(SiteConfiguration siteConfig, Function<ServerInfo,String> zkHostsFunction,
ToIntFunction<ServerInfo> zkTimeoutFunction, Function<ServerInfo,String> instanceNameFunction,
Function<ServerInfo,InstanceId> instanceIdFunction) {
ToIntFunction<ServerInfo> zkTimeoutFunction,
Function<ServerInfo,InstanceInfo> instanceInfoFunction) {
this.siteConfig = requireNonNull(siteConfig);
requireNonNull(zkHostsFunction);
requireNonNull(zkTimeoutFunction);
requireNonNull(instanceNameFunction);
requireNonNull(instanceIdFunction);
requireNonNull(instanceInfoFunction);

this.hadoopConf = memoize(Configuration::new);
this.volumeManager = memoize(() -> {
Expand All @@ -144,8 +132,7 @@ private ServerInfo(SiteConfiguration siteConfig, Function<ServerInfo,String> zkH
// from here on, set up the suppliers based on what was passed in, to support different cases
this.zooKeepers = memoize(() -> zkHostsFunction.apply(this));
this.zooKeepersSessionTimeOut = memoize(() -> zkTimeoutFunction.applyAsInt(this));
this.instanceId = memoize(() -> instanceIdFunction.apply(this));
this.instanceName = memoize(() -> instanceNameFunction.apply(this));
this.instanceInfo = memoize(() -> instanceInfoFunction.apply(this));
}

public SiteConfiguration getSiteConfiguration() {
Expand All @@ -156,9 +143,19 @@ public VolumeManager getVolumeManager() {
return volumeManager.get();
}

@Override
public InstanceInfo getInstanceInfo() {
return instanceInfo.get();
}

@Override
public String getInstanceName() {
return getInstanceInfo().getInstanceName();
}

@Override
public InstanceId getInstanceId() {
return instanceId.get();
return getInstanceInfo().getInstanceId();
}

@Override
Expand Down Expand Up @@ -203,11 +200,6 @@ public Properties getClientProperties() {
return properties;
}

@Override
public String getInstanceName() {
return instanceName.get();
}

public Credentials getCredentials() {
return credentials.get();
}
Expand Down
Loading