Skip to content

Commit e35052e

Browse files
authored
Implementation of RequestHandler to support parallel out-of-order bootstrapping (#160)
* Implementation of RequestHandler to support parallel out-of-order bootstrapping * Create map on bootstrap * Sanity tests * Add parallel request handler helper implementations * Refactor and comment * Refactor
1 parent 3d2de87 commit e35052e

File tree

8 files changed

+241
-5
lines changed

8 files changed

+241
-5
lines changed

src/main/scala-2.11/com/comcast/xfinity/sirius/uberstore/segmented/ParallelHelpers.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,7 @@ package com.comcast.xfinity.sirius.uberstore.segmented
33
import scala.collection.parallel.ParSeq
44

55
object ParallelHelpers {
6-
def parallelize[T](seq: Seq[T]): ParSeq[T] = seq.par
6+
implicit class ParSeqConverter[T](private val seq: Seq[T]) {
7+
def parallelize: ParSeq[T] = seq.par
8+
}
79
}

src/main/scala-2.12/com/comcast/xfinity/sirius/uberstore/segmented/ParallelHelpers.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,7 @@ package com.comcast.xfinity.sirius.uberstore.segmented
33
import scala.collection.parallel.ParSeq
44

55
object ParallelHelpers {
6-
def parallelize[T](seq: Seq[T]): ParSeq[T] = seq.par
6+
implicit class ParSeqConverter[T](private val seq: Seq[T]) {
7+
def parallelize: ParSeq[T] = seq.par
8+
}
79
}

src/main/scala-2.13/com/comcast/xfinity/sirius/uberstore/segmented/ParallelHelpers.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,7 @@ import scala.collection.parallel.CollectionConverters._
44
import scala.collection.parallel.ParSeq
55

66
object ParallelHelpers {
7-
def parallelize[T](seq: Seq[T]): ParSeq[T] = seq.par
7+
implicit class ParSeqConverter[T](private val seq: Seq[T]) {
8+
def parallelize: ParSeq[T] = seq.par
9+
}
810
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.comcast.xfinity.sirius.api
2+
3+
import java.util.concurrent.ConcurrentHashMap
4+
5+
abstract class AbstractParallelBootstrapRequestHandler[K, M] extends RequestHandler {
6+
private var sequences: Option[ConcurrentHashMap[K, Long]] = None
7+
8+
final override def onBootstrapStarting(): Unit = {
9+
onBootstrapStartingImpl()
10+
sequences = Some(new ConcurrentHashMap[K, Long]())
11+
}
12+
13+
final override def onBootstrapComplete(): Unit = {
14+
sequences = None
15+
onBootstrapCompletedImpl()
16+
}
17+
18+
final override def handleGet(key: String): SiriusResult =
19+
if (enabled()) handleGetImpl(createKey(key))
20+
else SiriusResult.none()
21+
22+
final override def handlePut(key: String, body: Array[Byte]): SiriusResult =
23+
if (enabled()) handlePutImpl(createKey(key), deserialize(body))
24+
else SiriusResult.none()
25+
26+
final override def handleDelete(key: String): SiriusResult =
27+
if (enabled()) handleDeleteImpl(createKey(key))
28+
else SiriusResult.none()
29+
30+
final override def handlePut(sequence: Long, key: String, body: Array[Byte]): SiriusResult =
31+
if (enabled())
32+
sequences match {
33+
case Some(map) =>
34+
val k = createKey(key)
35+
// Check if a newer sequence is already in the map and, if so, bail early
36+
if (map.get(k) >= sequence) SiriusResult.none()
37+
else {
38+
var result: SiriusResult = SiriusResult.none()
39+
// deserialize the body before calling compute to reduce lock contention
40+
val message = deserialize(body)
41+
map.compute(k, (_, existing) => {
42+
if (existing < sequence) {
43+
result = handlePutImpl(sequence, k, message)
44+
sequence
45+
} else existing
46+
})
47+
result
48+
}
49+
case None => handlePutImpl(sequence, createKey(key), deserialize(body))
50+
}
51+
else SiriusResult.none()
52+
53+
final override def handleDelete(sequence: Long, key: String): SiriusResult =
54+
sequences match {
55+
case Some(map) =>
56+
var result: SiriusResult = SiriusResult.none()
57+
map.compute(createKey(key), (k, existing) => {
58+
if (existing < sequence) {
59+
result = handleDeleteImpl(sequence, k)
60+
sequence
61+
} else existing
62+
})
63+
result
64+
case None => handleDeleteImpl(sequence, createKey(key))
65+
}
66+
67+
protected def enabled(): Boolean
68+
protected def createKey(key: String): K
69+
protected def deserialize(body: Array[Byte]): M
70+
71+
def onBootstrapStartingImpl(): Unit = { }
72+
def onBootstrapCompletedImpl(): Unit = { }
73+
def handleGetImpl(key: K): SiriusResult
74+
def handlePutImpl(key: K, body: M): SiriusResult
75+
def handleDeleteImpl(key: K): SiriusResult
76+
def handlePutImpl(sequence: Long, key: K, body: M): SiriusResult = handlePutImpl(key, body)
77+
def handleDeleteImpl(sequence:Long, key: K): SiriusResult = handleDeleteImpl(key)
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.comcast.xfinity.sirius.api
2+
3+
object ParallelBootstrapRequestHandler {
4+
def apply(requestHandler: RequestHandler): ParallelBootstrapRequestHandler =
5+
new ParallelBootstrapRequestHandler(requestHandler)
6+
}
7+
8+
class ParallelBootstrapRequestHandler(val requestHandler: RequestHandler) extends AbstractParallelBootstrapRequestHandler[String, Array[Byte]] {
9+
override protected def enabled(): Boolean = true
10+
override protected def createKey(key: String): String = key
11+
override protected def deserialize(body: Array[Byte]): Array[Byte] = body
12+
override def handleGetImpl(key: String): SiriusResult = requestHandler.handleGet(key)
13+
override def handlePutImpl(key: String, body: Array[Byte]): SiriusResult = requestHandler.handlePut(key, body)
14+
override def handleDeleteImpl(key: String): SiriusResult = requestHandler.handleDelete(key)
15+
override def handlePutImpl(sequence: Long, key: String, body: Array[Byte]): SiriusResult = requestHandler.handlePut(sequence, key, body)
16+
override def handleDeleteImpl(sequence: Long, key: String): SiriusResult = requestHandler.handleDelete(sequence, key)
17+
18+
override def onBootstrapStartingImpl(): Unit = requestHandler.onBootstrapStarting()
19+
override def onBootstrapCompletedImpl(): Unit = requestHandler.onBootstrapComplete()
20+
}

src/main/scala/com/comcast/xfinity/sirius/api/SiriusResult.scala

+10-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.comcast.xfinity.sirius.api
1717

1818
object SiriusResult {
19+
private val NONE: SiriusResult = SiriusResult(Right(None))
20+
private val OK: SiriusResult = some("ok")
1921

2022
/**
2123
* Factory method for creating a SiriusResult with a value
@@ -26,13 +28,20 @@ object SiriusResult {
2628
* @return SiriusResult
2729
*/
2830
def some(value: Object): SiriusResult = SiriusResult(Right(Some(value)))
31+
32+
/**
33+
* Factory method for creating a SiriusResult with the String value "ok"
34+
*
35+
* @return SiriusResult
36+
*/
37+
def ok(): SiriusResult = OK
2938

3039
/**
3140
* Factory method for creating a SiriusResult with no value
3241
*
3342
* @return SiriusResult
3443
*/
35-
def none(): SiriusResult = SiriusResult(Right(None))
44+
def none(): SiriusResult = NONE
3645

3746
/**
3847
* Factory method for creating a SiriusResult with an error.

src/main/scala/com/comcast/xfinity/sirius/uberstore/segmented/SegmentedUberStore.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import better.files.File
2020
import com.comcast.xfinity.sirius.api.SiriusConfiguration
2121
import com.comcast.xfinity.sirius.api.impl.OrderedEvent
2222
import com.comcast.xfinity.sirius.uberstore.data.UberDataFileHandleFactory
23+
import com.comcast.xfinity.sirius.uberstore.segmented.ParallelHelpers.ParSeqConverter
2324
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
2425

2526
import scala.annotation.tailrec
@@ -136,7 +137,7 @@ class SegmentedUberStore private[segmented] (base: JFile,
136137
def getNextSeq = nextSeq
137138

138139
override def parallelForeach[T](fun: OrderedEvent => T): Unit = {
139-
ParallelHelpers.parallelize(readOnlyDirs :+ liveDir)
140+
(readOnlyDirs :+ liveDir).parallelize
140141
.foreach(_.foldLeftRange(0, Long.MaxValue)(())((_, e) => fun(e)))
141142
}
142143

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package com.comcast.xfinity.sirius.api
2+
3+
import com.comcast.xfinity.sirius.NiceTest
4+
import org.mockito.Mockito.{verify, verifyNoMoreInteractions, when}
5+
6+
class ParallelBootstrapRequestHandlerTest extends NiceTest {
7+
8+
describe("delegates") {
9+
it("handlePut") {
10+
val requestHandler = mock[RequestHandler]
11+
val response = mock[SiriusResult]
12+
when(requestHandler.handlePut("key1", Array.empty)).thenReturn(response)
13+
14+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
15+
val result = underTest.handlePut("key1", Array.empty)
16+
17+
assert(result === response)
18+
verify(requestHandler).handlePut("key1", Array.empty)
19+
verifyNoMoreInteractions(requestHandler)
20+
}
21+
it("handlePut with sequence") {
22+
val requestHandler = mock[RequestHandler]
23+
val response = mock[SiriusResult]
24+
when(requestHandler.handlePut(1L, "key1", Array.empty)).thenReturn(response)
25+
26+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
27+
val result = underTest.handlePut(1L, "key1", Array.empty)
28+
29+
assert(result === response)
30+
verify(requestHandler).handlePut(1L, "key1", Array.empty)
31+
verifyNoMoreInteractions(requestHandler)
32+
}
33+
it("handleDelete") {
34+
val requestHandler = mock[RequestHandler]
35+
val response = mock[SiriusResult]
36+
when(requestHandler.handleDelete("key1")).thenReturn(response)
37+
38+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
39+
val result = underTest.handleDelete("key1")
40+
41+
assert(result === response)
42+
verify(requestHandler).handleDelete("key1")
43+
verifyNoMoreInteractions(requestHandler)
44+
}
45+
it("handleDelete with sequence") {
46+
val requestHandler = mock[RequestHandler]
47+
val response = mock[SiriusResult]
48+
when(requestHandler.handleDelete(1L, "key1")).thenReturn(response)
49+
50+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
51+
val result = underTest.handleDelete(1L, "key1")
52+
53+
assert(result === response)
54+
verify(requestHandler).handleDelete(1L, "key1")
55+
verifyNoMoreInteractions(requestHandler)
56+
}
57+
it("handleGet") {
58+
val requestHandler = mock[RequestHandler]
59+
val response = mock[SiriusResult]
60+
when(requestHandler.handleGet("key1")).thenReturn(response)
61+
62+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
63+
val result = underTest.handleGet("key1")
64+
65+
assert(result === response)
66+
verify(requestHandler).handleGet("key1")
67+
verifyNoMoreInteractions(requestHandler)
68+
}
69+
it("onBootstrapStarting") {
70+
val requestHandler = mock[RequestHandler]
71+
72+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
73+
underTest.onBootstrapStarting()
74+
75+
verify(requestHandler).onBootstrapStarting()
76+
verifyNoMoreInteractions(requestHandler)
77+
}
78+
it("onBootstrapComplete") {
79+
val requestHandler = mock[RequestHandler]
80+
81+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
82+
underTest.onBootstrapComplete()
83+
84+
verify(requestHandler).onBootstrapComplete()
85+
verifyNoMoreInteractions(requestHandler)
86+
}
87+
}
88+
89+
describe("during bootstrap") {
90+
it ("drops out-of-order events for the same key") {
91+
val requestHandler = mock[RequestHandler]
92+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
93+
94+
underTest.onBootstrapStarting()
95+
verify(requestHandler).onBootstrapStarting()
96+
97+
underTest.handlePut(1L, "key1", Array.empty)
98+
underTest.handlePut(4L, "key1", Array.empty)
99+
underTest.handlePut(3L, "key1", Array.empty)
100+
101+
verify(requestHandler).handlePut(1L, "key1", Array.empty)
102+
verify(requestHandler).handlePut(4L, "key1", Array.empty)
103+
verifyNoMoreInteractions(requestHandler)
104+
}
105+
it ("allows out-of-order events for different keys") {
106+
val requestHandler = mock[RequestHandler]
107+
val underTest = ParallelBootstrapRequestHandler(requestHandler)
108+
109+
underTest.onBootstrapStarting()
110+
verify(requestHandler).onBootstrapStarting()
111+
112+
underTest.handlePut(1L, "key1", Array.empty)
113+
underTest.handlePut(4L, "key1", Array.empty)
114+
underTest.handlePut(3L, "key2", Array.empty)
115+
116+
verify(requestHandler).handlePut(1L, "key1", Array.empty)
117+
verify(requestHandler).handlePut(4L, "key1", Array.empty)
118+
verify(requestHandler).handlePut(3L, "key2", Array.empty)
119+
verifyNoMoreInteractions(requestHandler)
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)