Skip to content

Commit a023848

Browse files
committed
Fix fencepost issues with stream algorithm plus fix tests
1 parent 4efd665 commit a023848

File tree

6 files changed

+490
-164
lines changed

6 files changed

+490
-164
lines changed

core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala

+217-93
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.aiven.guardian.kafka.backup
22

3+
import akka.NotUsed
34
import akka.stream.scaladsl._
45
import akka.util.ByteString
56
import io.aiven.guardian.kafka.Errors
@@ -17,25 +18,6 @@ import java.time._
1718
import java.time.format.DateTimeFormatter
1819
import java.time.temporal._
1920

20-
/** A marker used to indicate in which position the current backup stream is
21-
*/
22-
sealed abstract class BackupStreamPosition
23-
24-
object BackupStreamPosition {
25-
26-
/** The backup stream has just started right now
27-
*/
28-
case object Start extends BackupStreamPosition
29-
30-
/** The backup stream is in the middle of a time period
31-
*/
32-
case object Middle extends BackupStreamPosition
33-
34-
/** The backup stream position has just hit a boundary for when a new period starts
35-
*/
36-
case object Boundary extends BackupStreamPosition
37-
}
38-
3921
/** An interface for a template on how to backup a Kafka Stream into some data storage
4022
* @tparam T
4123
* The underlying `kafkaClientInterface` type
@@ -44,6 +26,29 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
4426
implicit val kafkaClientInterface: T
4527
implicit val backupConfig: Backup
4628

29+
/** An element from the original record
30+
*/
31+
private[backup] sealed trait RecordElement
32+
private[backup] case class Element(reducedConsumerRecord: ReducedConsumerRecord,
33+
context: kafkaClientInterface.CursorContext
34+
) extends RecordElement
35+
private[backup] case object End extends RecordElement
36+
37+
/** An element after the record has been transformed to a ByteString
38+
*/
39+
private[backup] sealed trait ByteStringElement {
40+
val data: ByteString
41+
val context: kafkaClientInterface.CursorContext
42+
}
43+
44+
private[backup] case class Start(override val data: ByteString,
45+
override val context: kafkaClientInterface.CursorContext,
46+
key: String
47+
) extends ByteStringElement
48+
private[backup] case class Tail(override val data: ByteString,
49+
override val context: kafkaClientInterface.CursorContext
50+
) extends ByteStringElement
51+
4752
/** Override this type to define the result of backing up data to a datasource
4853
*/
4954
type BackupResult
@@ -66,23 +71,60 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
6671
*/
6772
def empty: () => Future[BackupResult]
6873

69-
@nowarn("msg=not.*?exhaustive")
7074
private[backup] def calculateBackupStreamPositions(
7175
sourceWithPeriods: SourceWithContext[(ReducedConsumerRecord, Long),
7276
kafkaClientInterface.CursorContext,
7377
kafkaClientInterface.Control
7478
]
75-
): SourceWithContext[(ReducedConsumerRecord, BackupStreamPosition),
76-
kafkaClientInterface.CursorContext,
77-
kafkaClientInterface.Control
78-
] = sourceWithPeriods
79-
.sliding(2)
80-
.map { case Seq((beforeReducedConsumerRecord, beforeDivisions), (_, afterDivisions)) =>
81-
val backupStreamPosition = splitAtBoundaryCondition(beforeDivisions, afterDivisions)
79+
): Source[RecordElement, kafkaClientInterface.Control] =
80+
sourceWithPeriods.asSource
81+
.prefixAndTail(2)
82+
// This algorithm only works with Source's that have 2 or more elements
83+
.flatMapConcat {
84+
case (Seq(
85+
((firstReducedConsumerRecord, firstDivision), firstContext),
86+
((secondReducedConsumerRecord, secondDivision), secondContext)
87+
),
88+
rest
89+
) =>
90+
val all = Source
91+
.combine(
92+
Source(
93+
List(
94+
((firstReducedConsumerRecord, firstDivision), firstContext),
95+
((secondReducedConsumerRecord, secondDivision), secondContext)
96+
)
97+
),
98+
rest
99+
)(Concat(_))
82100

83-
(beforeReducedConsumerRecord, backupStreamPosition)
84-
}
85-
.mapContext { case Seq(cursorContext, _) => cursorContext }
101+
val withDivisions =
102+
all
103+
.sliding(2)
104+
.map {
105+
case Seq(((_, beforeDivisions), _), ((afterReducedConsumerRecord, afterDivisions), afterContext)) =>
106+
if (splitAtBoundaryCondition(beforeDivisions, afterDivisions))
107+
List(
108+
End,
109+
Element(afterReducedConsumerRecord, afterContext)
110+
)
111+
else
112+
List(Element(afterReducedConsumerRecord, afterContext))
113+
case rest =>
114+
throw Errors.UnhandledStreamCase(rest)
115+
}
116+
.mapConcat(identity)
117+
118+
Source.combine(
119+
Source.single(Element(firstReducedConsumerRecord, firstContext)),
120+
withDivisions
121+
)(Concat(_))
122+
// This case only occurs if a Source has a single element so we just directly return it
123+
case (Seq(((singleReducedConsumerRecord, _), singleContext)), _) =>
124+
Source.single(Element(singleReducedConsumerRecord, singleContext))
125+
case (rest, _) =>
126+
throw Errors.UnhandledStreamCase(rest)
127+
}
86128

87129
private[backup] def sourceWithPeriods(
88130
source: Source[(OffsetDateTime, (ReducedConsumerRecord, kafkaClientInterface.CursorContext)),
@@ -107,12 +149,101 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
107149

108150
Source.combine(
109151
Source.single((firstTimestamp, (firstReducedConsumerRecord, firstCursorContext))),
110-
rest.map { case (reducedConsumerRecord, context) => (firstTimestamp, (reducedConsumerRecord, context)) }
152+
rest.map { case (reducedConsumerRecord, context) =>
153+
(firstTimestamp, (reducedConsumerRecord, context))
154+
}
111155
)(Concat(_))
112156
case None => throw Errors.ExpectedStartOfSource
113157
}
114158
}
115159

160+
/** Transforms a sequence of [[RecordElement]]'s into a ByteString so that it can be persisted into the data storage
161+
*
162+
* @param sourceElements
163+
* A sequence of [[RecordElement]]'s as a result of `sliding(2)`
164+
* @return
165+
* a [[ByteString]] ready to be persisted along with the original context form the [[RecordElement]]
166+
*/
167+
private[backup] def transformReducedConsumerRecords(sourceElements: Seq[RecordElement]) = {
168+
val stringWithContext = sourceElements match {
169+
case Seq(Element(reducedConsumerRecord, context)) =>
170+
// Happens in Sentinel case that is explicitly called at start of stream OR when a stream is interrupted by the user
171+
// in which case stream needs to be terminated with `null]` in order to be valid
172+
List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)},", Some(context)))
173+
case Seq(Element(firstReducedConsumerRecord, firstContext),
174+
Element(secondReducedConsumerRecord, secondContext)
175+
) =>
176+
List(
177+
(s"${reducedConsumerRecordAsString(firstReducedConsumerRecord)},", Some(firstContext)),
178+
(s"${reducedConsumerRecordAsString(secondReducedConsumerRecord)},", Some(secondContext))
179+
)
180+
case Seq(Element(reducedConsumerRecord, context), End) =>
181+
List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]", Some(context)))
182+
case Seq(End) =>
183+
List(("]", None))
184+
case rest => throw Errors.UnhandledStreamCase(rest)
185+
}
186+
stringWithContext.map { case (string, context) => (ByteString(string), context) }
187+
}
188+
189+
/** Applies the transformation to the first element of a Stream so that it starts of as a JSON array.
190+
*
191+
* @param element
192+
* Starting [[Element]]
193+
* @param key
194+
* The current key being processed
195+
* @param terminate
196+
* Whether to immediately terminate the JSON array for single element in Stream case
197+
* @return
198+
* A [[List]] containing a single [[Start]] ready to be processed in the [[Sink]]
199+
*/
200+
private[backup] def transformFirstElement(element: Element, key: String, terminate: Boolean) =
201+
transformReducedConsumerRecords(List(element)).map {
202+
case (byteString, Some(context)) =>
203+
if (terminate)
204+
Start(ByteString("[") ++ byteString.dropRight(1) ++ ByteString("]"), context, key)
205+
else
206+
Start(ByteString("[") ++ byteString, context, key)
207+
case _ =>
208+
throw Errors.UnhandledStreamCase(List(element))
209+
}
210+
211+
/** Fixes the case where is an odd amount of elements in the stream
212+
* @param head
213+
* of stream as a result of `prefixAndTail`
214+
* @param restSource
215+
* of the stream as a result of `prefixAndTail`
216+
* @return
217+
* A [[List]] of ([[ByteString]], [[kafkaClientInterface.CursorContext]]) with the tail elements fixed up.
218+
*/
219+
private[backup] def transformTailingElement(
220+
head: Seq[(ByteString, Option[kafkaClientInterface.CursorContext])],
221+
restSource: Source[(ByteString, Option[kafkaClientInterface.CursorContext]), NotUsed]
222+
) = {
223+
val restTransformed = restSource
224+
.sliding(2, step = 2)
225+
.map {
226+
case Seq((before, Some(context)), (after, None)) =>
227+
List((before.dropRight(1) ++ after, context))
228+
case Seq((before, Some(beforeContext)), (after, Some(afterContext))) =>
229+
List((before, beforeContext), (after, afterContext))
230+
case Seq((single, Some(context))) =>
231+
List((single, context))
232+
case rest =>
233+
throw Errors.UnhandledStreamCase(rest)
234+
}
235+
236+
head match {
237+
case Seq((byteString, Some(cursorContext))) =>
238+
Source.combine(
239+
Source.single((List((byteString, cursorContext)))),
240+
restTransformed
241+
)(Concat(_))
242+
case rest =>
243+
throw Errors.UnhandledStreamCase(rest)
244+
}
245+
}
246+
116247
/** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a
117248
* data source.
118249
* @return
@@ -127,56 +258,71 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
127258

128259
val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord))
129260

130-
val split = withBackupStreamPositions.asSource.splitAfter { case ((_, backupStreamPosition), _) =>
131-
backupStreamPosition == BackupStreamPosition.Boundary
132-
}
261+
val split = withBackupStreamPositions
262+
.splitAfter { case sourceElement =>
263+
sourceElement match {
264+
case End => true
265+
case _ => false
266+
}
267+
}
133268

134269
val substreams = split
135-
.prefixAndTail(1)
136-
.flatMapConcat { case (head, restOfReducedConsumerRecords) =>
137-
head.headOption match {
138-
case Some(((firstReducedConsumerRecord, _), firstContext)) =>
139-
// We need to retrieve the first element of the stream in order to calculate the key/filename
140-
val key = calculateKey(firstReducedConsumerRecord.toOffsetDateTime)
141-
142-
// Now that we have retrieved the first element of the stream, lets recombine it so we create the
143-
// original stream
144-
val combined = Source.combine(
145-
Source.single(
146-
(
147-
(firstReducedConsumerRecord, BackupStreamPosition.Start),
148-
firstContext
149-
)
150-
),
151-
restOfReducedConsumerRecords
152-
)(Concat(_))
270+
.prefixAndTail(2)
271+
.flatMapConcat {
272+
case (Seq(only: Element, End), _) =>
273+
// This case only occurs when you have a single element in a timeslice.
274+
// We have to terminate immediately to create a JSON array with a single element
275+
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
276+
Source(transformFirstElement(only, key, terminate = true))
277+
case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) =>
278+
val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime)
279+
val firstSource = transformFirstElement(first, key, terminate = false)
280+
281+
val rest = Source.combine(
282+
Source.single(second),
283+
restOfReducedConsumerRecords
284+
)(Concat(_))
153285

154-
// Go through every element in the stream and convert the `ReducedCustomerRecord` to an actual bytestream
155-
val transformed = combined.map { case ((record, position), context) =>
156-
val transform = transformReducedConsumerRecords(record, position)
157-
(transform, context)
158-
}
286+
val restTransformed = rest
287+
.sliding(2, step = 2)
288+
.map(transformReducedConsumerRecords)
289+
.mapConcat(identity)
290+
.prefixAndTail(1)
291+
.flatMapConcat((transformTailingElement _).tupled)
292+
.mapConcat(identity)
293+
.map { case (byteString, context) => Tail(byteString, context) }
159294

160-
transformed.map(data => (data, key))
161-
case None => Source.empty
162-
}
295+
Source.combine(
296+
Source(
297+
firstSource
298+
),
299+
restTransformed
300+
)(Concat(_))
301+
case (Seq(only: Element), _) =>
302+
// This case can also occur when user terminates the stream
303+
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
304+
Source(transformFirstElement(only, key, terminate = false))
305+
case (rest, _) =>
306+
throw Errors.UnhandledStreamCase(rest)
163307
}
164308

165309
// Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071
166310
@nowarn("msg=method lazyInit in object Sink is deprecated")
167311
val subFlowSink = substreams
168-
.alsoTo(kafkaClientInterface.commitCursor.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
169-
case ((_, context), _) => context
312+
.alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement] { byteStringElement =>
313+
byteStringElement.context
170314
})
171315
.to(
172316
// See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
173317
Sink.lazyInit(
174-
{ case (_, key) =>
175-
Future.successful(
176-
backupToStorageSink(key).contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
177-
case ((byteString, _), _) => byteString
178-
}
179-
)
318+
{
319+
case start: Start =>
320+
Future.successful(
321+
backupToStorageSink(start.key).contramap[ByteStringElement] { byteStringElement =>
322+
byteStringElement.data
323+
}
324+
)
325+
case _ => throw Errors.ExpectedStartOfSource
180326
},
181327
empty
182328
)
@@ -211,35 +357,13 @@ object BackupClientInterface {
211357
* @return
212358
* The position of the Stream
213359
*/
214-
def splitAtBoundaryCondition(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): BackupStreamPosition =
360+
def splitAtBoundaryCondition(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): Boolean =
215361
(dividedPeriodsBefore, dividedPeriodsAfter) match {
216362
case (before, after) if after > before =>
217-
BackupStreamPosition.Boundary
363+
true
218364
case _ =>
219-
BackupStreamPosition.Middle
220-
}
221-
222-
/** Transforms a `ReducedConsumer` record into a ByteString so that it can be persisted into the data storage
223-
* @param reducedConsumerRecord
224-
* The ReducedConsumerRecord to persist
225-
* @param backupStreamPosition
226-
* The position of the record relative in the stream (so it knows if its at the start, middle or end)
227-
* @return
228-
* a `ByteString` ready to be persisted
229-
*/
230-
def transformReducedConsumerRecords(reducedConsumerRecord: ReducedConsumerRecord,
231-
backupStreamPosition: BackupStreamPosition
232-
): ByteString = {
233-
val string = backupStreamPosition match {
234-
case BackupStreamPosition.Start =>
235-
s"[${reducedConsumerRecordAsString(reducedConsumerRecord)},"
236-
case BackupStreamPosition.Middle =>
237-
s"${reducedConsumerRecordAsString(reducedConsumerRecord)},"
238-
case BackupStreamPosition.Boundary =>
239-
s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]"
365+
false
240366
}
241-
ByteString(string)
242-
}
243367

244368
protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime,
245369
period: FiniteDuration,

0 commit comments

Comments
 (0)