Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions castor/src/Actors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,31 @@ import collection.mutable

abstract class BaseActor[T]()(implicit ac: Context) extends Actor[T]{
private val queue = new mutable.Queue[(T, Context.Token)]()

def softQueueLimit = ac.softQueueLimit
implicit def self = this
private var scheduled = false

def scheduleRun() = ac.execute(new Runnable{def run(): Unit = runWithItems()})


def send(t: T)
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line): Unit = synchronized{
line: sourcecode.Line,
sender: BaseActor[_]): Unit = synchronized{
val token = ac.reportSchedule(this, t, fileName, line)
queue.enqueue((t, token))
if (!scheduled){
if (queue.length > softQueueLimit) ac.reportBlocking(sender, this)
if (!ac.isBlocked(this) && !scheduled){
scheduled = true
ac.execute(
new Runnable{
def run(): Unit = runWithItems()
}
)
scheduleRun()
}
}
def sendAsync(f: scala.concurrent.Future[T])
(implicit fileName: sourcecode.FileName,
line: sourcecode.Line) = {
line: sourcecode.Line,
sender: BaseActor[_]) = {
f.onComplete{
case scala.util.Success(v) => this.send(v)
case scala.util.Success(v) => this.send(v)(fileName, line, sender)
case scala.util.Failure(e) => ac.reportFailure(e)
}
}
Expand All @@ -33,16 +36,15 @@ abstract class BaseActor[T]()(implicit ac: Context) extends Actor[T]{
private[this] def runWithItems(): Unit = {
val msgs = synchronized(queue.dequeueAll(_ => true))

runBatch0(msgs)
if (msgs.nonEmpty) runBatch0(msgs)

synchronized{
if (queue.nonEmpty) ac.execute(
new Runnable{
def run(): Unit = runWithItems()
}
)
if (queue.nonEmpty) this.scheduleRun()
else{
assert(scheduled)
for(noLongerBlocked <- ac.clearBlocking(this)){
noLongerBlocked.scheduleRun()
}
scheduled = false
}
}
Expand Down
11 changes: 11 additions & 0 deletions castor/src/Context.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import scala.concurrent.{Await, ExecutionContext, Future, Promise}
* outstanding tasks or log the actor message sends for debugging purposes
*/
trait Context extends ExecutionContext {
def softQueueLimit: Int = Int.MaxValue
val blocking = new MultiBiMap[BaseActor[_], BaseActor[_]]
def isBlocked(sender: BaseActor[_]): Boolean = {
blocking.containsValue(sender)
}
def clearBlocking(receiver: BaseActor[_]) = synchronized{
blocking.removeAll(receiver).filter(!isBlocked(_))
}
def reportBlocking(sender: BaseActor[_], receiver: BaseActor[_]) = synchronized{
blocking.add(receiver, sender)
}
def reportSchedule(): Context.Token = new Context.Token.Simple()

def reportSchedule(fileName: sourcecode.FileName,
Expand Down
28 changes: 28 additions & 0 deletions castor/src/MultiBiMap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package castor


import scala.collection.mutable

/**
* A map from keys to collections of values: you can assign multiple values
* to any particular key. Also allows lookups in both directions: what values
* are assigned to a key or what key a value is assigned to.
*/

class MultiBiMap[K, V](){
private[this] val valueToKeys = mutable.LinkedHashMap.empty[V, mutable.Set[K]]
private[this] val keyToValues = mutable.LinkedHashMap.empty[K, mutable.Set[V]]
def containsValue(v: V) = valueToKeys.contains(v)
def add(k: K, v: V): Unit = {
valueToKeys.getOrElse(v, mutable.Set()).add(k)
keyToValues.getOrElseUpdate(k, mutable.Set()).add(v)
}
def removeAll(k: K): Set[V] = keyToValues.get(k) match {
case None => Set()
case Some(vs) =>
vs.foreach(valueToKeys(_).remove(k))

keyToValues.remove(k)
vs.toSet
}
}