Skip to content

Commit 4b9593f

Browse files
authored
Fix fencepost issues with stream algorithm plus fix tests
1 parent cb5ebe4 commit 4b9593f

File tree

6 files changed

+495
-166
lines changed

6 files changed

+495
-166
lines changed

core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala

Lines changed: 222 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.aiven.guardian.kafka.backup
22

3+
import akka.NotUsed
34
import akka.stream.scaladsl._
45
import akka.util.ByteString
56
import io.aiven.guardian.kafka.Errors
@@ -17,25 +18,6 @@ import java.time._
1718
import java.time.format.DateTimeFormatter
1819
import java.time.temporal._
1920

20-
/** A marker used to indicate in which position the current backup stream is
21-
*/
22-
sealed abstract class BackupStreamPosition
23-
24-
object BackupStreamPosition {
25-
26-
/** The backup stream has just started right now
27-
*/
28-
case object Start extends BackupStreamPosition
29-
30-
/** The backup stream is in the middle of a time period
31-
*/
32-
case object Middle extends BackupStreamPosition
33-
34-
/** The backup stream position has just hit a boundary for when a new period starts
35-
*/
36-
case object Boundary extends BackupStreamPosition
37-
}
38-
3921
/** An interface for a template on how to backup a Kafka Stream into some data storage
4022
* @tparam T
4123
* The underlying `kafkaClientInterface` type
@@ -44,6 +26,29 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
4426
implicit val kafkaClientInterface: T
4527
implicit val backupConfig: Backup
4628

29+
/** An element from the original record
30+
*/
31+
private[backup] sealed trait RecordElement
32+
private[backup] case class Element(reducedConsumerRecord: ReducedConsumerRecord,
33+
context: kafkaClientInterface.CursorContext
34+
) extends RecordElement
35+
private[backup] case object End extends RecordElement
36+
37+
/** An element after the record has been transformed to a ByteString
38+
*/
39+
private[backup] sealed trait ByteStringElement {
40+
val data: ByteString
41+
val context: kafkaClientInterface.CursorContext
42+
}
43+
44+
private[backup] case class Start(override val data: ByteString,
45+
override val context: kafkaClientInterface.CursorContext,
46+
key: String
47+
) extends ByteStringElement
48+
private[backup] case class Tail(override val data: ByteString,
49+
override val context: kafkaClientInterface.CursorContext
50+
) extends ByteStringElement
51+
4752
/** Override this type to define the result of backing up data to a datasource
4853
*/
4954
type BackupResult
@@ -66,23 +71,60 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
6671
*/
6772
def empty: () => Future[BackupResult]
6873

69-
@nowarn("msg=not.*?exhaustive")
7074
private[backup] def calculateBackupStreamPositions(
7175
sourceWithPeriods: SourceWithContext[(ReducedConsumerRecord, Long),
7276
kafkaClientInterface.CursorContext,
7377
kafkaClientInterface.Control
7478
]
75-
): SourceWithContext[(ReducedConsumerRecord, BackupStreamPosition),
76-
kafkaClientInterface.CursorContext,
77-
kafkaClientInterface.Control
78-
] = sourceWithPeriods
79-
.sliding(2)
80-
.map { case Seq((beforeReducedConsumerRecord, beforeDivisions), (_, afterDivisions)) =>
81-
val backupStreamPosition = splitAtBoundaryCondition(beforeDivisions, afterDivisions)
79+
): Source[RecordElement, kafkaClientInterface.Control] =
80+
sourceWithPeriods.asSource
81+
.prefixAndTail(2)
82+
// This algorithm only works with Source's that have 2 or more elements
83+
.flatMapConcat {
84+
case (Seq(
85+
((firstReducedConsumerRecord, firstDivision), firstContext),
86+
((secondReducedConsumerRecord, secondDivision), secondContext)
87+
),
88+
rest
89+
) =>
90+
val all = Source
91+
.combine(
92+
Source(
93+
List(
94+
((firstReducedConsumerRecord, firstDivision), firstContext),
95+
((secondReducedConsumerRecord, secondDivision), secondContext)
96+
)
97+
),
98+
rest
99+
)(Concat(_))
82100

83-
(beforeReducedConsumerRecord, backupStreamPosition)
84-
}
85-
.mapContext { case Seq(cursorContext, _) => cursorContext }
101+
val withDivisions =
102+
all
103+
.sliding(2)
104+
.map {
105+
case Seq(((_, beforeDivisions), _), ((afterReducedConsumerRecord, afterDivisions), afterContext)) =>
106+
if (isAtBoundary(beforeDivisions, afterDivisions))
107+
List(
108+
End,
109+
Element(afterReducedConsumerRecord, afterContext)
110+
)
111+
else
112+
List(Element(afterReducedConsumerRecord, afterContext))
113+
case rest =>
114+
throw Errors.UnhandledStreamCase(rest)
115+
}
116+
.mapConcat(identity)
117+
118+
Source.combine(
119+
Source.single(Element(firstReducedConsumerRecord, firstContext)),
120+
withDivisions
121+
)(Concat(_))
122+
// This case only occurs if a Source has a single element so we just directly return it
123+
case (Seq(((singleReducedConsumerRecord, _), singleContext)), _) =>
124+
Source.single(Element(singleReducedConsumerRecord, singleContext))
125+
case (rest, _) =>
126+
throw Errors.UnhandledStreamCase(rest)
127+
}
86128

87129
private[backup] def sourceWithPeriods(
88130
source: Source[(OffsetDateTime, (ReducedConsumerRecord, kafkaClientInterface.CursorContext)),
@@ -107,12 +149,104 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
107149

108150
Source.combine(
109151
Source.single((firstTimestamp, (firstReducedConsumerRecord, firstCursorContext))),
110-
rest.map { case (reducedConsumerRecord, context) => (firstTimestamp, (reducedConsumerRecord, context)) }
152+
rest.map { case (reducedConsumerRecord, context) =>
153+
(firstTimestamp, (reducedConsumerRecord, context))
154+
}
111155
)(Concat(_))
112156
case None => throw Errors.ExpectedStartOfSource
113157
}
114158
}
115159

160+
/** Transforms a sequence of [[RecordElement]]'s into a ByteString so that it can be persisted into the data storage
161+
*
162+
* @param sourceElements
163+
* A sequence of [[RecordElement]]'s as a result of `sliding(2)`
164+
* @return
165+
* a [[ByteString]] ready to be persisted along with the original context form the [[RecordElement]]
166+
*/
167+
private[backup] def transformReducedConsumerRecords(sourceElements: Seq[RecordElement]) = {
168+
val stringWithContext = sourceElements match {
169+
case Seq(Element(reducedConsumerRecord, context)) =>
170+
// Happens in Sentinel case that is explicitly called at start of stream OR when a stream is interrupted by the user
171+
// in which case stream needs to be terminated with `null]` in order to be valid
172+
List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)},", Some(context)))
173+
case Seq(Element(firstReducedConsumerRecord, firstContext),
174+
Element(secondReducedConsumerRecord, secondContext)
175+
) =>
176+
List(
177+
(s"${reducedConsumerRecordAsString(firstReducedConsumerRecord)},", Some(firstContext)),
178+
(s"${reducedConsumerRecordAsString(secondReducedConsumerRecord)},", Some(secondContext))
179+
)
180+
case Seq(Element(reducedConsumerRecord, context), End) =>
181+
List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]", Some(context)))
182+
case Seq(End) =>
183+
List(("]", None))
184+
case rest => throw Errors.UnhandledStreamCase(rest)
185+
}
186+
stringWithContext.map { case (string, context) => (ByteString(string), context) }
187+
}
188+
189+
private[backup] def dropCommaFromEndOfJsonArray(byteString: ByteString) =
190+
byteString.dropRight(1)
191+
192+
/** Applies the transformation to the first element of a Stream so that it starts of as a JSON array.
193+
*
194+
* @param element
195+
* Starting [[Element]]
196+
* @param key
197+
* The current key being processed
198+
* @param terminate
199+
* Whether to immediately terminate the JSON array for single element in Stream case
200+
* @return
201+
* A [[List]] containing a single [[Start]] ready to be processed in the [[Sink]]
202+
*/
203+
private[backup] def transformFirstElement(element: Element, key: String, terminate: Boolean) =
204+
transformReducedConsumerRecords(List(element)).map {
205+
case (byteString, Some(context)) =>
206+
if (terminate)
207+
Start(ByteString("[") ++ dropCommaFromEndOfJsonArray(byteString) ++ ByteString("]"), context, key)
208+
else
209+
Start(ByteString("[") ++ byteString, context, key)
210+
case _ =>
211+
throw Errors.UnhandledStreamCase(List(element))
212+
}
213+
214+
/** Fixes the case where is an odd amount of elements in the stream
215+
* @param head
216+
* of stream as a result of `prefixAndTail`
217+
* @param restSource
218+
* of the stream as a result of `prefixAndTail`
219+
* @return
220+
* A [[List]] of ([[ByteString]], [[kafkaClientInterface.CursorContext]]) with the tail elements fixed up.
221+
*/
222+
private[backup] def transformTailingElement(
223+
head: Seq[(ByteString, Option[kafkaClientInterface.CursorContext])],
224+
restSource: Source[(ByteString, Option[kafkaClientInterface.CursorContext]), NotUsed]
225+
) = {
226+
val restTransformed = restSource
227+
.sliding(2, step = 2)
228+
.map {
229+
case Seq((before, Some(context)), (after, None)) =>
230+
List((dropCommaFromEndOfJsonArray(before) ++ after, context))
231+
case Seq((before, Some(beforeContext)), (after, Some(afterContext))) =>
232+
List((before, beforeContext), (after, afterContext))
233+
case Seq((single, Some(context))) =>
234+
List((single, context))
235+
case rest =>
236+
throw Errors.UnhandledStreamCase(rest)
237+
}
238+
239+
head match {
240+
case Seq((byteString, Some(cursorContext))) =>
241+
Source.combine(
242+
Source.single((List((byteString, cursorContext)))),
243+
restTransformed
244+
)(Concat(_))
245+
case rest =>
246+
throw Errors.UnhandledStreamCase(rest)
247+
}
248+
}
249+
116250
/** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a
117251
* data source.
118252
* @return
@@ -127,56 +261,71 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
127261

128262
val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord))
129263

130-
val split = withBackupStreamPositions.asSource.splitAfter { case ((_, backupStreamPosition), _) =>
131-
backupStreamPosition == BackupStreamPosition.Boundary
132-
}
264+
val split = withBackupStreamPositions
265+
.splitAfter { case sourceElement =>
266+
sourceElement match {
267+
case End => true
268+
case _ => false
269+
}
270+
}
133271

134272
val substreams = split
135-
.prefixAndTail(1)
136-
.flatMapConcat { case (head, restOfReducedConsumerRecords) =>
137-
head.headOption match {
138-
case Some(((firstReducedConsumerRecord, _), firstContext)) =>
139-
// We need to retrieve the first element of the stream in order to calculate the key/filename
140-
val key = calculateKey(firstReducedConsumerRecord.toOffsetDateTime)
141-
142-
// Now that we have retrieved the first element of the stream, lets recombine it so we create the
143-
// original stream
144-
val combined = Source.combine(
145-
Source.single(
146-
(
147-
(firstReducedConsumerRecord, BackupStreamPosition.Start),
148-
firstContext
149-
)
150-
),
151-
restOfReducedConsumerRecords
152-
)(Concat(_))
273+
.prefixAndTail(2)
274+
.flatMapConcat {
275+
case (Seq(only: Element, End), _) =>
276+
// This case only occurs when you have a single element in a timeslice.
277+
// We have to terminate immediately to create a JSON array with a single element
278+
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
279+
Source(transformFirstElement(only, key, terminate = true))
280+
case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) =>
281+
val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime)
282+
val firstSource = transformFirstElement(first, key, terminate = false)
283+
284+
val rest = Source.combine(
285+
Source.single(second),
286+
restOfReducedConsumerRecords
287+
)(Concat(_))
153288

154-
// Go through every element in the stream and convert the `ReducedCustomerRecord` to an actual bytestream
155-
val transformed = combined.map { case ((record, position), context) =>
156-
val transform = transformReducedConsumerRecords(record, position)
157-
(transform, context)
158-
}
289+
val restTransformed = rest
290+
.sliding(2, step = 2)
291+
.map(transformReducedConsumerRecords)
292+
.mapConcat(identity)
293+
.prefixAndTail(1)
294+
.flatMapConcat((transformTailingElement _).tupled)
295+
.mapConcat(identity)
296+
.map { case (byteString, context) => Tail(byteString, context) }
159297

160-
transformed.map(data => (data, key))
161-
case None => Source.empty
162-
}
298+
Source.combine(
299+
Source(
300+
firstSource
301+
),
302+
restTransformed
303+
)(Concat(_))
304+
case (Seq(only: Element), _) =>
305+
// This case can also occur when user terminates the stream
306+
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
307+
Source(transformFirstElement(only, key, terminate = false))
308+
case (rest, _) =>
309+
throw Errors.UnhandledStreamCase(rest)
163310
}
164311

165312
// Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071
166313
@nowarn("msg=method lazyInit in object Sink is deprecated")
167314
val subFlowSink = substreams
168-
.alsoTo(kafkaClientInterface.commitCursor.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
169-
case ((_, context), _) => context
315+
.alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement] { byteStringElement =>
316+
byteStringElement.context
170317
})
171318
.to(
172319
// See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
173320
Sink.lazyInit(
174-
{ case (_, key) =>
175-
Future.successful(
176-
backupToStorageSink(key).contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
177-
case ((byteString, _), _) => byteString
178-
}
179-
)
321+
{
322+
case start: Start =>
323+
Future.successful(
324+
backupToStorageSink(start.key).contramap[ByteStringElement] { byteStringElement =>
325+
byteStringElement.data
326+
}
327+
)
328+
case _ => throw Errors.ExpectedStartOfSource
180329
},
181330
empty
182331
)
@@ -202,44 +351,22 @@ object BackupClientInterface {
202351
def calculateKey(offsetDateTime: OffsetDateTime): String =
203352
s"${BackupClientInterface.formatOffsetDateTime(offsetDateTime)}.json"
204353

205-
/** Calculates the current position in 2 element sliding of a Stream.
354+
/** Calculates whether we have rolled over a time period given number of divided periods.
206355
* @param dividedPeriodsBefore
207356
* The number of divided periods in the first element of the slide. -1 is used as a sentinel value to indicate the
208357
* start of the stream
209358
* @param dividedPeriodsAfter
210359
* The number of divided periods in the second element of the slide
211360
* @return
212-
* The position of the Stream
361+
* `true` if we have hit a time boundary otherwise `false`
213362
*/
214-
def splitAtBoundaryCondition(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): BackupStreamPosition =
363+
def isAtBoundary(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): Boolean =
215364
(dividedPeriodsBefore, dividedPeriodsAfter) match {
216365
case (before, after) if after > before =>
217-
BackupStreamPosition.Boundary
366+
true
218367
case _ =>
219-
BackupStreamPosition.Middle
220-
}
221-
222-
/** Transforms a `ReducedConsumer` record into a ByteString so that it can be persisted into the data storage
223-
* @param reducedConsumerRecord
224-
* The ReducedConsumerRecord to persist
225-
* @param backupStreamPosition
226-
* The position of the record relative in the stream (so it knows if its at the start, middle or end)
227-
* @return
228-
* a `ByteString` ready to be persisted
229-
*/
230-
def transformReducedConsumerRecords(reducedConsumerRecord: ReducedConsumerRecord,
231-
backupStreamPosition: BackupStreamPosition
232-
): ByteString = {
233-
val string = backupStreamPosition match {
234-
case BackupStreamPosition.Start =>
235-
s"[${reducedConsumerRecordAsString(reducedConsumerRecord)},"
236-
case BackupStreamPosition.Middle =>
237-
s"${reducedConsumerRecordAsString(reducedConsumerRecord)},"
238-
case BackupStreamPosition.Boundary =>
239-
s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]"
368+
false
240369
}
241-
ByteString(string)
242-
}
243370

244371
protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime,
245372
period: FiniteDuration,

0 commit comments

Comments
 (0)