11package io .aiven .guardian .kafka .backup
22
3+ import akka .NotUsed
34import akka .stream .scaladsl ._
45import akka .util .ByteString
56import io .aiven .guardian .kafka .Errors
@@ -17,25 +18,6 @@ import java.time._
1718import java .time .format .DateTimeFormatter
1819import java .time .temporal ._
1920
20- /** A marker used to indicate in which position the current backup stream is
21- */
22- sealed abstract class BackupStreamPosition
23-
24- object BackupStreamPosition {
25-
26- /** The backup stream has just started right now
27- */
28- case object Start extends BackupStreamPosition
29-
30- /** The backup stream is in the middle of a time period
31- */
32- case object Middle extends BackupStreamPosition
33-
34- /** The backup stream position has just hit a boundary for when a new period starts
35- */
36- case object Boundary extends BackupStreamPosition
37- }
38-
3921/** An interface for a template on how to backup a Kafka Stream into some data storage
4022 * @tparam T
4123 * The underlying `kafkaClientInterface` type
@@ -44,6 +26,29 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
4426 implicit val kafkaClientInterface : T
4527 implicit val backupConfig : Backup
4628
29+ /** An element from the original record
30+ */
31+ private [backup] sealed trait RecordElement
32+ private [backup] case class Element (reducedConsumerRecord : ReducedConsumerRecord ,
33+ context : kafkaClientInterface.CursorContext
34+ ) extends RecordElement
35+ private [backup] case object End extends RecordElement
36+
37+ /** An element after the record has been transformed to a ByteString
38+ */
39+ private [backup] sealed trait ByteStringElement {
40+ val data : ByteString
41+ val context : kafkaClientInterface.CursorContext
42+ }
43+
44+ private [backup] case class Start (override val data : ByteString ,
45+ override val context : kafkaClientInterface.CursorContext ,
46+ key : String
47+ ) extends ByteStringElement
48+ private [backup] case class Tail (override val data : ByteString ,
49+ override val context : kafkaClientInterface.CursorContext
50+ ) extends ByteStringElement
51+
4752 /** Override this type to define the result of backing up data to a datasource
4853 */
4954 type BackupResult
@@ -66,23 +71,60 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
6671 */
6772 def empty : () => Future [BackupResult ]
6873
69- @ nowarn(" msg=not.*?exhaustive" )
7074 private [backup] def calculateBackupStreamPositions (
7175 sourceWithPeriods : SourceWithContext [(ReducedConsumerRecord , Long ),
7276 kafkaClientInterface.CursorContext ,
7377 kafkaClientInterface.Control
7478 ]
75- ): SourceWithContext [(ReducedConsumerRecord , BackupStreamPosition ),
76- kafkaClientInterface.CursorContext ,
77- kafkaClientInterface.Control
78- ] = sourceWithPeriods
79- .sliding(2 )
80- .map { case Seq ((beforeReducedConsumerRecord, beforeDivisions), (_, afterDivisions)) =>
81- val backupStreamPosition = splitAtBoundaryCondition(beforeDivisions, afterDivisions)
79+ ): Source [RecordElement , kafkaClientInterface.Control ] =
80+ sourceWithPeriods.asSource
81+ .prefixAndTail(2 )
82+ // This algorithm only works with Source's that have 2 or more elements
83+ .flatMapConcat {
84+ case (Seq (
85+ ((firstReducedConsumerRecord, firstDivision), firstContext),
86+ ((secondReducedConsumerRecord, secondDivision), secondContext)
87+ ),
88+ rest
89+ ) =>
90+ val all = Source
91+ .combine(
92+ Source (
93+ List (
94+ ((firstReducedConsumerRecord, firstDivision), firstContext),
95+ ((secondReducedConsumerRecord, secondDivision), secondContext)
96+ )
97+ ),
98+ rest
99+ )(Concat (_))
82100
83- (beforeReducedConsumerRecord, backupStreamPosition)
84- }
85- .mapContext { case Seq (cursorContext, _) => cursorContext }
101+ val withDivisions =
102+ all
103+ .sliding(2 )
104+ .map {
105+ case Seq (((_, beforeDivisions), _), ((afterReducedConsumerRecord, afterDivisions), afterContext)) =>
106+ if (splitAtBoundaryCondition(beforeDivisions, afterDivisions))
107+ List (
108+ End ,
109+ Element (afterReducedConsumerRecord, afterContext)
110+ )
111+ else
112+ List (Element (afterReducedConsumerRecord, afterContext))
113+ case rest =>
114+ throw Errors .UnhandledStreamCase (rest)
115+ }
116+ .mapConcat(identity)
117+
118+ Source .combine(
119+ Source .single(Element (firstReducedConsumerRecord, firstContext)),
120+ withDivisions
121+ )(Concat (_))
122+ // This case only occurs if a Source has a single element so we just directly return it
123+ case (Seq (((singleReducedConsumerRecord, _), singleContext)), _) =>
124+ Source .single(Element (singleReducedConsumerRecord, singleContext))
125+ case (rest, _) =>
126+ throw Errors .UnhandledStreamCase (rest)
127+ }
86128
87129 private [backup] def sourceWithPeriods (
88130 source : Source [(OffsetDateTime , (ReducedConsumerRecord , kafkaClientInterface.CursorContext )),
@@ -107,12 +149,101 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
107149
108150 Source .combine(
109151 Source .single((firstTimestamp, (firstReducedConsumerRecord, firstCursorContext))),
110- rest.map { case (reducedConsumerRecord, context) => (firstTimestamp, (reducedConsumerRecord, context)) }
152+ rest.map { case (reducedConsumerRecord, context) =>
153+ (firstTimestamp, (reducedConsumerRecord, context))
154+ }
111155 )(Concat (_))
112156 case None => throw Errors .ExpectedStartOfSource
113157 }
114158 }
115159
160+ /** Transforms a sequence of [[RecordElement ]]'s into a ByteString so that it can be persisted into the data storage
161+ *
162+ * @param sourceElements
163+ * A sequence of [[RecordElement ]]'s as a result of `sliding(2)`
164+ * @return
165+ * a [[ByteString ]] ready to be persisted along with the original context form the [[RecordElement ]]
166+ */
167+ private [backup] def transformReducedConsumerRecords (sourceElements : Seq [RecordElement ]) = {
168+ val stringWithContext = sourceElements match {
169+ case Seq (Element (reducedConsumerRecord, context)) =>
170+ // Happens in Sentinel case that is explicitly called at start of stream OR when a stream is interrupted by the user
171+ // in which case stream needs to be terminated with `null]` in order to be valid
172+ List ((s " ${reducedConsumerRecordAsString(reducedConsumerRecord)}, " , Some (context)))
173+ case Seq (Element (firstReducedConsumerRecord, firstContext),
174+ Element (secondReducedConsumerRecord, secondContext)
175+ ) =>
176+ List (
177+ (s " ${reducedConsumerRecordAsString(firstReducedConsumerRecord)}, " , Some (firstContext)),
178+ (s " ${reducedConsumerRecordAsString(secondReducedConsumerRecord)}, " , Some (secondContext))
179+ )
180+ case Seq (Element (reducedConsumerRecord, context), End ) =>
181+ List ((s " ${reducedConsumerRecordAsString(reducedConsumerRecord)}] " , Some (context)))
182+ case Seq (End ) =>
183+ List ((" ]" , None ))
184+ case rest => throw Errors .UnhandledStreamCase (rest)
185+ }
186+ stringWithContext.map { case (string, context) => (ByteString (string), context) }
187+ }
188+
189+ /** Applies the transformation to the first element of a Stream so that it starts of as a JSON array.
190+ *
191+ * @param element
192+ * Starting [[Element ]]
193+ * @param key
194+ * The current key being processed
195+ * @param terminate
196+ * Whether to immediately terminate the JSON array for single element in Stream case
197+ * @return
198+ * A [[List ]] containing a single [[Start ]] ready to be processed in the [[Sink ]]
199+ */
200+ private [backup] def transformFirstElement (element : Element , key : String , terminate : Boolean ) =
201+ transformReducedConsumerRecords(List (element)).map {
202+ case (byteString, Some (context)) =>
203+ if (terminate)
204+ Start (ByteString (" [" ) ++ byteString.dropRight(1 ) ++ ByteString (" ]" ), context, key)
205+ else
206+ Start (ByteString (" [" ) ++ byteString, context, key)
207+ case _ =>
208+ throw Errors .UnhandledStreamCase (List (element))
209+ }
210+
211+ /** Fixes the case where is an odd amount of elements in the stream
212+ * @param head
213+ * of stream as a result of `prefixAndTail`
214+ * @param restSource
215+ * of the stream as a result of `prefixAndTail`
216+ * @return
217+ * A [[List ]] of ([[ByteString ]], [[kafkaClientInterface.CursorContext ]]) with the tail elements fixed up.
218+ */
219+ private [backup] def transformTailingElement (
220+ head : Seq [(ByteString , Option [kafkaClientInterface.CursorContext ])],
221+ restSource : Source [(ByteString , Option [kafkaClientInterface.CursorContext ]), NotUsed ]
222+ ) = {
223+ val restTransformed = restSource
224+ .sliding(2 , step = 2 )
225+ .map {
226+ case Seq ((before, Some (context)), (after, None )) =>
227+ List ((before.dropRight(1 ) ++ after, context))
228+ case Seq ((before, Some (beforeContext)), (after, Some (afterContext))) =>
229+ List ((before, beforeContext), (after, afterContext))
230+ case Seq ((single, Some (context))) =>
231+ List ((single, context))
232+ case rest =>
233+ throw Errors .UnhandledStreamCase (rest)
234+ }
235+
236+ head match {
237+ case Seq ((byteString, Some (cursorContext))) =>
238+ Source .combine(
239+ Source .single((List ((byteString, cursorContext)))),
240+ restTransformed
241+ )(Concat (_))
242+ case rest =>
243+ throw Errors .UnhandledStreamCase (rest)
244+ }
245+ }
246+
116247 /** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a
117248 * data source.
118249 * @return
@@ -127,56 +258,71 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
127258
128259 val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord))
129260
130- val split = withBackupStreamPositions.asSource.splitAfter { case ((_, backupStreamPosition), _) =>
131- backupStreamPosition == BackupStreamPosition .Boundary
132- }
261+ val split = withBackupStreamPositions
262+ .splitAfter { case sourceElement =>
263+ sourceElement match {
264+ case End => true
265+ case _ => false
266+ }
267+ }
133268
134269 val substreams = split
135- .prefixAndTail(1 )
136- .flatMapConcat { case (head, restOfReducedConsumerRecords) =>
137- head.headOption match {
138- case Some (((firstReducedConsumerRecord, _), firstContext)) =>
139- // We need to retrieve the first element of the stream in order to calculate the key/filename
140- val key = calculateKey(firstReducedConsumerRecord.toOffsetDateTime)
141-
142- // Now that we have retrieved the first element of the stream, lets recombine it so we create the
143- // original stream
144- val combined = Source .combine(
145- Source .single(
146- (
147- (firstReducedConsumerRecord, BackupStreamPosition .Start ),
148- firstContext
149- )
150- ),
151- restOfReducedConsumerRecords
152- )(Concat (_))
270+ .prefixAndTail(2 )
271+ .flatMapConcat {
272+ case (Seq (only : Element , End ), _) =>
273+ // This case only occurs when you have a single element in a timeslice.
274+ // We have to terminate immediately to create a JSON array with a single element
275+ val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
276+ Source (transformFirstElement(only, key, terminate = true ))
277+ case (Seq (first : Element , second : Element ), restOfReducedConsumerRecords) =>
278+ val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime)
279+ val firstSource = transformFirstElement(first, key, terminate = false )
280+
281+ val rest = Source .combine(
282+ Source .single(second),
283+ restOfReducedConsumerRecords
284+ )(Concat (_))
153285
154- // Go through every element in the stream and convert the `ReducedCustomerRecord` to an actual bytestream
155- val transformed = combined.map { case ((record, position), context) =>
156- val transform = transformReducedConsumerRecords(record, position)
157- (transform, context)
158- }
286+ val restTransformed = rest
287+ .sliding(2 , step = 2 )
288+ .map(transformReducedConsumerRecords)
289+ .mapConcat(identity)
290+ .prefixAndTail(1 )
291+ .flatMapConcat((transformTailingElement _).tupled)
292+ .mapConcat(identity)
293+ .map { case (byteString, context) => Tail (byteString, context) }
159294
160- transformed.map(data => (data, key))
161- case None => Source .empty
162- }
295+ Source .combine(
296+ Source (
297+ firstSource
298+ ),
299+ restTransformed
300+ )(Concat (_))
301+ case (Seq (only : Element ), _) =>
302+ // This case can also occur when user terminates the stream
303+ val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
304+ Source (transformFirstElement(only, key, terminate = false ))
305+ case (rest, _) =>
306+ throw Errors .UnhandledStreamCase (rest)
163307 }
164308
165309 // Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071
166310 @ nowarn(" msg=method lazyInit in object Sink is deprecated" )
167311 val subFlowSink = substreams
168- .alsoTo(kafkaClientInterface.commitCursor.contramap[(( ByteString , kafkaClientInterface. CursorContext ), String ) ] {
169- case ((_, context), _) => context
312+ .alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement ] { byteStringElement =>
313+ byteStringElement. context
170314 })
171315 .to(
172316 // See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
173317 Sink .lazyInit(
174- { case (_, key) =>
175- Future .successful(
176- backupToStorageSink(key).contramap[((ByteString , kafkaClientInterface.CursorContext ), String )] {
177- case ((byteString, _), _) => byteString
178- }
179- )
318+ {
319+ case start : Start =>
320+ Future .successful(
321+ backupToStorageSink(start.key).contramap[ByteStringElement ] { byteStringElement =>
322+ byteStringElement.data
323+ }
324+ )
325+ case _ => throw Errors .ExpectedStartOfSource
180326 },
181327 empty
182328 )
@@ -211,35 +357,13 @@ object BackupClientInterface {
211357 * @return
212358 * The position of the Stream
213359 */
214- def splitAtBoundaryCondition (dividedPeriodsBefore : Long , dividedPeriodsAfter : Long ): BackupStreamPosition =
360+ def splitAtBoundaryCondition (dividedPeriodsBefore : Long , dividedPeriodsAfter : Long ): Boolean =
215361 (dividedPeriodsBefore, dividedPeriodsAfter) match {
216362 case (before, after) if after > before =>
217- BackupStreamPosition . Boundary
363+ true
218364 case _ =>
219- BackupStreamPosition .Middle
220- }
221-
222- /** Transforms a `ReducedConsumer` record into a ByteString so that it can be persisted into the data storage
223- * @param reducedConsumerRecord
224- * The ReducedConsumerRecord to persist
225- * @param backupStreamPosition
226- * The position of the record relative in the stream (so it knows if its at the start, middle or end)
227- * @return
228- * a `ByteString` ready to be persisted
229- */
230- def transformReducedConsumerRecords (reducedConsumerRecord : ReducedConsumerRecord ,
231- backupStreamPosition : BackupStreamPosition
232- ): ByteString = {
233- val string = backupStreamPosition match {
234- case BackupStreamPosition .Start =>
235- s " [ ${reducedConsumerRecordAsString(reducedConsumerRecord)}, "
236- case BackupStreamPosition .Middle =>
237- s " ${reducedConsumerRecordAsString(reducedConsumerRecord)}, "
238- case BackupStreamPosition .Boundary =>
239- s " ${reducedConsumerRecordAsString(reducedConsumerRecord)}] "
365+ false
240366 }
241- ByteString (string)
242- }
243367
244368 protected def calculateNumberOfPeriodsFromTimestamp (initialTime : OffsetDateTime ,
245369 period : FiniteDuration ,
0 commit comments