Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,16 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH =
"/confstore";

/** Read-retry period for the ZKConfigurationStore. */
@Private
@Unstable
public static final String RM_SCHEDCONF_STORE_ZK_READ_RETRY_SECS =
YARN_PREFIX + "scheduler.configuration.zk-store.read-retry-secs";
@Private
@Unstable
public static final int DEFAULT_RM_SCHEDCONF_STORE_ZK_READ_RETRY_SECS = 3;


@Private
@Unstable
public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4624,6 +4624,17 @@
<value>/confstore</value>
</property>

<property>
<description>
ZK read-retry seconds when using zookeeper-based configuration store.
Should the ZK configuration store being formatted while one of the RM
is in the INIT state, allow retries using this interval in the hope that the store will be
re-populated with some data soon.
</description>
<name>yarn.scheduler.configuration.zk-store.read-retry-secs</name>
<value>3</value>
</property>

<property>
<description>
Provides an option for client to load supported resource types from RM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* A Zookeeper-based implementation of {@link YarnConfigurationStore}.
Expand Down Expand Up @@ -265,9 +266,37 @@ private boolean createNewZkPath(String path) throws Exception {

@VisibleForTesting
protected byte[] getZkData(String path) throws Exception {
return zkManager.getData(path);
// If the 'yarn resourcemanager -format-conf-store' command is issued
// while one of the RMs is in a starting state, the RM may fail. This
// occurs because the /confstore/CONF_STORE path may not yet exist.
// Alternatively, if the confstore is in the process of being written,
// the getZkData method returns a null value, causing the crash.
//To prevent this, added a re-try mechanism before giving up.
int maxRetries = 6;
int attempt = 1;
int sleepBetweenRetries = conf.getInt(
YarnConfiguration.RM_SCHEDCONF_STORE_ZK_READ_RETRY_SECS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_READ_RETRY_SECS);

while (attempt < maxRetries) {
if(zkManager.exists(path)) {
LOG.debug("zkManager.exists(path) {} exists.", path);
byte[] zkData = zkManager.getData(path);
if (zkData != null && zkData.length > 0) {
LOG.debug("We are returning the zkData OK!");
return zkData;
}
}
LOG.warn("The ZK CONFSTORE path or the ZkData was null. Retrying in {} "
+ "seconds... (Attempt {})", sleepBetweenRetries, attempt);
TimeUnit.SECONDS.sleep(sleepBetweenRetries);
attempt++;
}
LOG.error("The ZK CONFSTORE path or the ZkData was null. Giving up.");
return new byte[0];
}


@VisibleForTesting
protected void setZkData(String path, byte[] data) throws Exception {
zkManager.setData(path, data, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,28 @@ public void testDisableAuditLogs() throws Exception {
prepareLogMutation("key1", "val1");

data = ((ZKConfigurationStore) confStore).getZkData(logsPath);
assertNull(data, "Failed to Disable Audit Logs");
assertEquals(0, data.length, "Failed to Disable Audit Logs");
}

@Test
public void testZkData() throws Exception {
conf.setInt(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_READ_RETRY_SECS, 0);
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
String confStorePath = getZkPath("CONF_STORE");
byte[] data = ((ZKConfigurationStore) confStore).getZkData(confStorePath);
assertTrue(data.length > 0, "ConfStore data expected to be not null.");
}

@Test
public void testEmptyZkData() throws Exception {
conf.setInt(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_READ_RETRY_SECS, 0);
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
confStore.format();
String confStorePath = getZkPath("CONF_STORE");
byte[] data = ((ZKConfigurationStore) confStore).getZkData(confStorePath);
assertEquals(0, data.length, "ConfStore data expected to be empty.");
}

public Configuration createRMHAConf(String rmIds, String rmId,
Expand Down
Loading