Skip to content

Commit

Permalink
Updated code as per the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Kanamarlapudi authored and praveen-kanamarlapudi committed May 17, 2017
1 parent e94f507 commit 2d092ae
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 10 deletions.
2 changes: 1 addition & 1 deletion conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
# livy.impersonation.enabled = true

# Logs size livy can cache for each session/batch. 0 means don't cache the logs.
# livy.spark.logs.size = 200
# livy.cache-log.size = 200

# Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation
# directory every time a session is started. By caching these files in HDFS, for example, startup
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ object LivyConf {
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")

// Livy will cache the max no of logs specified. 0 means don't cache the logs.
val SPARK_LOGS_SIZE = Entry("livy.spark.logs.size", 200)
val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)

// If Livy can't find the yarn app within this time, consider it lost.
val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ class InteractiveSession(
heartbeat()

private val app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) }
}

if (client.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class LineBufferedStream(inputStream: InputStream, logSize: Int) extends Logging
private val thread = new Thread {
override def run() = {
val lines = Source.fromInputStream(inputStream).getLines()
for (line <- lines) {
for (line <- lines) {
info(s"stdout: $line")
_lock.lock()
try {
info(s"stdout: $line")
if(logSize > 0) _lines add line
_lines.add(line)
_condition.signalAll()
} finally {
_lock.unlock()
Expand All @@ -61,7 +61,12 @@ class LineBufferedStream(inputStream: InputStream, logSize: Int) extends Logging
thread.setDaemon(true)
thread.start()

def lines: IndexedSeq[String] = IndexedSeq.empty[String] ++ _lines.toArray(Array.empty[String])
def lines: IndexedSeq[String] = {
_lock.lock()
val lines = IndexedSeq.empty[String] ++ _lines.toArray(Array.empty[String])
_lock.unlock()
lines
}

def iterator: Iterator[String] = {
new LinesIterator
Expand Down
3 changes: 1 addition & 2 deletions server/src/main/scala/com/cloudera/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ object SparkApp {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else {
// process is None in recovery mode
// require(process.isDefined, "process must not be None when Livy master is not YARN.")
require(process.isDefined, "process must not be None when Livy master is not YARN.")
new SparkProcApp(process.get, listener)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SparkProcApp (

override def log(): IndexedSeq[String] =
("stdout: " +: process.inputLines) ++ ("\nstderr: " +: process.errorLines)

private def changeState(newState: SparkApp.State.Value) = {
if (state != newState) {
listener.foreach(_.stateChanged(state, newState))
Expand Down

0 comments on commit 2d092ae

Please sign in to comment.