Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jubatusonyarn/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ val SCALA_VERSION = "2.10.4"
val VERSION = "1.1"

val JUBATUS_DEPENDENCIES = Seq(
("us.jubat" % "jubatus" % "0.7.1").exclude("org.jboss.netty", "netty"),
("us.jubat" % "jubatus" % "0.8.0").exclude("org.jboss.netty", "netty"),
"org.apache.hadoop" % "hadoop-common" % "2.3.0-cdh5.1.3" % "provided",
"org.apache.hadoop" % "hadoop-hdfs" % "2.3.0-cdh5.1.3" % "provided",
"org.apache.hadoop" % "hadoop-yarn-client" % "2.3.0-cdh5.1.3" % "provided"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants
import us.jubat.yarn.common.HasLogger

import scala.collection.JavaConverters._
import scala.collection._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
import org.apache.hadoop.fs.{FileSystem, Path}
Expand All @@ -38,12 +39,13 @@ class ApplicationMaster extends HasLogger {
private def containerJarPath(aBasePath: Path): Path = new Path(aBasePath, "container/jubatus-on-yarn-container.jar")
private val containerJarName: String = "jubatus-on-yarn-container.jar"
private val containerMainClass: String = "us.jubat.yarn.container.ContainerApp"
private val containerMemory: Int = 128

private val mYarnConfig = new YarnConfiguration()

// 終了までブロックする
def run(aParams: ApplicationMasterParams, aApplicationMasterPort: Int): FinalApplicationStatus = {
logger.debug(s"ApplicationMasterParams (${aParams.toString()})")

val tHandler = new ApplicationMasterHandler(aParams, aApplicationMasterPort)

val tResourceManager = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](500, tHandler)
Expand All @@ -56,9 +58,18 @@ class ApplicationMaster extends HasLogger {
tPriority.setPriority(aParams.priority)

val tResource = Records.newRecord(classOf[Resource])
tResource.setMemory(aParams.memory + containerMemory)
tResource.setMemory(aParams.memory + aParams.containerMemory)
tResource.setVirtualCores(aParams.virtualCores)

var containerNodes: Array[String] = null
var containerRacks: Array[String] = null
if (!aParams.containerNodes.isEmpty()) {
containerNodes = aParams.containerNodes.split(",").toArray[String]
}
if (!aParams.containerRacks.isEmpty()) {
containerRacks = aParams.containerRacks.split(",").toArray[String]
}

// コンテナを起動
(1 to aParams.nodes).foreach { _ =>
logger.info(
Expand All @@ -67,7 +78,9 @@ class ApplicationMaster extends HasLogger {
+ s"\tmemory: ${tResource.getMemory}\n"
+ s"\tvirtualCores: ${tResource.getVirtualCores}"
)
tResourceManager.addContainerRequest(new ContainerRequest(tResource, null, null, tPriority))
val containerReq = new ContainerRequest(tResource, containerNodes, containerRacks, tPriority)
tResourceManager.addContainerRequest(containerReq)
logger.debug(s"ContainerRequest( Nodes:${containerReq.getNodes}, Racks:${containerReq.getRacks}")
}

// コンテナの終了を待機
Expand Down Expand Up @@ -117,12 +130,17 @@ class ApplicationMaster extends HasLogger {
val tLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])

// LocalResource
tLaunchContext.setLocalResources(
Map(
entryScriptName -> toLocalResource(entryScriptPath(new Path(aParams.basePath))),
containerJarName -> toLocalResource(containerJarPath(new Path(aParams.basePath)))
).asJava
)
val localResource = mutable.Map.empty[String, LocalResource]
localResource += entryScriptName -> toLocalResource(entryScriptPath(new Path(aParams.basePath)))
localResource += containerJarName -> toLocalResource(containerJarPath(new Path(aParams.basePath)))
val serverLogConfFileName = if (aParams.jubatusServerLogConf nonEmpty) {
val logConfFileName = aParams.jubatusServerLogConf.split('/').last
localResource += logConfFileName -> toLocalResource(new Path(aParams.jubatusServerLogConf))
logConfFileName
} else {
"""\"\""""
}
tLaunchContext.setLocalResources(localResource.asJava)

// ClassPath
val tApplicationMasterEnv = new java.util.HashMap[String, String]()
Expand All @@ -140,7 +158,7 @@ class ApplicationMaster extends HasLogger {
s"bash $entryScriptName"
+ s" $containerJarName"
+ s" $containerMainClass"
+ s" $containerMemory"
+ s" ${aParams.containerMemory}"

// jar にわたす
+ s" ${aParams.applicationName}" // --application-name
Expand All @@ -152,6 +170,14 @@ class ApplicationMaster extends HasLogger {
+ s" ${aParams.learningMachineName}" // --name
+ s" ${aParams.learningMachineType}" // juba{*}
+ s" ${aParams.zooKeepers}" // --zookeeper
+ s" ${aParams.thread}" // --thread
+ s" ${aParams.timeout}" // --timeout
+ s" ${aParams.mixer}" // --mixer
+ s" ${aParams.intervalSec}" // --interval_sec
+ s" ${aParams.intervalCount}" // --interval_count
+ s" ${aParams.zookeeperTimeout}" // --zookeeper_timeout
+ s" ${aParams.interconnectTimeout}" // --interconnect_timeout
+ s" ${serverLogConfFileName}" // --log_config

+ s" 1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout"
+ s" 2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ class ApplicationMasterParams {
@org.kohsuke.args4j.Option(name = "--virtual-cores")
var virtualCores: Int = 1

@org.kohsuke.args4j.Option(name = "--container-memory")
var containerMemory: Int = 128

@org.kohsuke.args4j.Option(name = "--container-nodes")
var containerNodes: String = ""

@org.kohsuke.args4j.Option(name = "--container-racks")
var containerRacks: String = ""


@org.kohsuke.args4j.Option(name = "--learning-machine-name")
var learningMachineName: String = ""
Expand Down Expand Up @@ -145,7 +154,43 @@ class ApplicationMasterParams {
@org.kohsuke.args4j.Option(name = "--jubatus-proxy-process-id")
var jubatusProxyProcessId: Int = 0


@org.kohsuke.args4j.Option(name = "--base-path")
var basePath: String = ""

@org.kohsuke.args4j.Option(name = "--thread")
var thread: Int = 2

@org.kohsuke.args4j.Option(name = "--timeout")
var timeout: Int = 10

@org.kohsuke.args4j.Option(name = "--mixer")
var mixer: String = "linear_mixer"

@org.kohsuke.args4j.Option(name = "--interval_sec")
var intervalSec: Int = 16

@org.kohsuke.args4j.Option(name = "--interval_count")
var intervalCount: Int = 512

@org.kohsuke.args4j.Option(name = "--zookeeper_timeout")
var zookeeperTimeout: Int = 10

@org.kohsuke.args4j.Option(name = "--interconnect_timeout")
var interconnectTimeout: Int = 10

@org.kohsuke.args4j.Option(name = "--log_config")
var jubatusServerLogConf: String = ""

override def toString(): String = {
val text = s"""applicationName: $applicationName, nodes: $nodes, priority: $priority,
memory: $memory, virtualCores: $virtualCores, containerMemory: $containerMemory,
containerNodes: $containerNodes, containerRacks: $containerRacks, learningMachineName: $learningMachineName,
learningMachineType: $learningMachineType, zooKeepers: $zooKeepers, managementAddress: $managementAddress,
managementPort: $managementPort, applicationMasterNodeAddress: $applicationMasterNodeAddress,
jubatusProxyPort: $jubatusProxyPort, jubatusProxyProcessId: $jubatusProxyProcessId,
thread: $thread, timeout: $timeout, mixer: $mixer, intervalSec: $intervalSec, intervalCount: $intervalCount,
zookeeperTimeout: $zookeeperTimeout, interconnectTimeout: $interconnectTimeout, jubatusServerLogConf: $jubatusServerLogConf
""".stripMargin.trim
text
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ APPLICATION_MASTER_PORT="${7}"
LEARNING_MACHINE_NAME="${8}"
LEARNING_MACHINE_TYPE="${9}"
ZOOKEEPER="${10}"
THREAD="${11}"
TIMEOUT="${12}"
MIXER="${13}"
INTERVAL_SEC="${14}"
INTERVAL_COUNT="${15}"
ZOOKEEPER_TIMEOUT="${16}"
INTERCONNECT_TIMEOUT="${17}"

JUBATUS_SERVER_LOG_CONFIG_NAME="${18}"
if [ -n "${JUBATUS_SERVER_LOG_CONFIG_NAME}" ]; then
JUBATUS_SERVER_LOG_CONFIG_OPTION="--log_config=${JUBATUS_SERVER_LOG_CONFIG_NAME}"
fi

IP_ADDRESS=`grep $(hostname) /etc/hosts | awk '{print $1}'`
LISTEN_IF=`netstat -ie | grep -B1 ${IP_ADDRESS} | head -n1 | awk '{print $1}'`
Expand All @@ -27,8 +39,12 @@ for i in `seq 10`; do
fi
done

echo juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=10 --interval_count=0 --rpc-port=${JUBATUS_SERVER_PORT} --name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} >> /tmp/Container 2>&1
juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=10 --interval_count=0 --rpc-port=${JUBATUS_SERVER_PORT} --name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} &
echo juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=${INTERVAL_SEC} --interval_count=${INTERVAL_COUNT} --rpc-port=${JUBATUS_SERVER_PORT} \
--name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} --thread ${THREAD} --timeout ${TIMEOUT} --mixer ${MIXER} --zookeeper_timeout ${ZOOKEEPER_TIMEOUT} --interconnect_timeout ${INTERCONNECT_TIMEOUT} \
${JUBATUS_SERVER_LOG_CONFIG_OPTION}
juba${LEARNING_MACHINE_TYPE} --zookeeper=${ZOOKEEPER} --interval_sec=${INTERVAL_SEC} --interval_count=${INTERVAL_COUNT} --rpc-port=${JUBATUS_SERVER_PORT} \
--name=${LEARNING_MACHINE_NAME} --listen_if ${LISTEN_IF} --thread ${THREAD} --timeout ${TIMEOUT} --mixer ${MIXER} --zookeeper_timeout ${ZOOKEEPER_TIMEOUT} --interconnect_timeout ${INTERCONNECT_TIMEOUT} \
${JUBATUS_SERVER_LOG_CONFIG_OPTION} &
JUBATUS_SERVER_PROCESS_ID=$!

# jubatus server の起動待機
Expand All @@ -37,7 +53,7 @@ for i in `seq 10`; do
continue 2
fi

echo jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status >> /tmp/Container 2>&1
echo jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status
if (jubactl --zookeeper=${ZOOKEEPER} --server=juba${LEARNING_MACHINE_TYPE} --type=${LEARNING_MACHINE_TYPE} --name=${LEARNING_MACHINE_NAME} --cmd status \
| awk '/active '${LEARNING_MACHINE_NAME}' members:/ {flag=1; next} /active/ {flag=0} flag==1 {print}' \
| grep "^${IP_ADDRESS}_${JUBATUS_SERVER_PORT}$"); then
Expand All @@ -57,11 +73,11 @@ fi
echo $JAVA_HOME/bin/java -Xmx${CONTAINER_MEMORY_SIZE}M ${CONTAINER_JRA_MAIN_CLASS} --seq ${SEQ} \
--application-name ${APPLICATION_NAME} --application-master-address ${APPLICATION_MASTER_ADDRESS} --application-master-port ${APPLICATION_MASTER_PORT} \
--container-node-address ${IP_ADDRESS} --jubatus-server-port ${JUBATUS_SERVER_PORT} --jubatus-server-process-id ${JUBATUS_SERVER_PROCESS_ID} \
--learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} >> /tmp/Container 2>&1
--learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE}
$JAVA_HOME/bin/java -Xmx${CONTAINER_MEMORY_SIZE}M ${CONTAINER_JRA_MAIN_CLASS} --seq ${SEQ} \
--application-name ${APPLICATION_NAME} --application-master-address ${APPLICATION_MASTER_ADDRESS} --application-master-port ${APPLICATION_MASTER_PORT} \
--container-node-address ${IP_ADDRESS} --jubatus-server-port ${JUBATUS_SERVER_PORT} --jubatus-server-process-id ${JUBATUS_SERVER_PROCESS_ID} \
--learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE} >> /tmp/Container 2>&1
--learning-machine-name ${LEARNING_MACHINE_NAME} --learning-machine-type ${LEARNING_MACHINE_TYPE}
EXIT_CODE=$?
ps ${JUBATUS_SERVER_PROCESS_ID} && kill ${JUBATUS_SERVER_PROCESS_ID}
exit $EXIT_CODE
Loading