-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathEventTimeJoinExampleApp.scala
More file actions
267 lines (210 loc) · 9.23 KB
/
EventTimeJoinExampleApp.scala
File metadata and controls
267 lines (210 loc) · 9.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
package svend.example.eventimejoin
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, Serdes}
import org.apache.kafka.streams.state.Stores
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.processor.{FailOnInvalidTimestamp, ProcessorContext}
import org.apache.kafka.streams.state.{KeyValueStore, WindowStore}
import play.api.libs.json._
import scala.collection.JavaConverters._
/**
* timestamped recommendation of a consultant: "let's synergize the out-of-the-box ROI" !
* */
case class Recommendation(event_time: Long, ingestion_time: Long, consultant: String, recommendation: String)
object Recommendation {
implicit val recommendationReads = Json.reads[Recommendation]
implicit val recommendationWrites = Json.writes[Recommendation]
def parseJson(rawJson: String): JsResult[Recommendation] =
Json.parse(rawJson).validate[Recommendation]
}
/**
* timestamped mood event of a consultant: sad, happy, neutral
* */
case class Mood(event_time: Long, ingestion_time: Long, name: String, mood: String)
/**j
* result of joining a recommendation with the consultant's mood
* */
case class MoodRec(eventTime: Long, consultant: String, mood: Option[String], recommendation: String)
/**
* Stateful stream transformer plugged to 2 topics: consultants's mood and consultant's business recommendations.
*
* => tries to produce a streams of joined events in for the form of instances of MoodRec, resolving
*/
class EventTimeJoiner extends Transformer[String, Either[Recommendation, Mood], KeyValue[String, MoodRec]] {
// how often do we review previously joined data?
val reviewJoinPeriod = 1000
// at each periodic review, we revisit all joined data before (now() - reviewLagDelta)
val reviewLagDelta = 3000
val BEGINNING_OF_TIMES = 0l
// tons of mutable null values initialized a bit later, because java
var ctx: ProcessorContext = _
var moodStore: KeyValueStore[String, List[Mood]] = _
var bestEffortJoinsStore: WindowStore[String, MoodRec] = _
var consultantNamesStore : WindowStore[String, String] = _
override def init(context: ProcessorContext): Unit = {
ctx = context
ctx.schedule(reviewJoinPeriod)
moodStore = ctx.getStateStore("moods").asInstanceOf[KeyValueStore[String, List[Mood]]]
bestEffortJoinsStore = ctx.getStateStore("bestEffortJoins").asInstanceOf[WindowStore[String, MoodRec]]
consultantNamesStore = ctx.getStateStore("consultants").asInstanceOf[WindowStore[String, String]]
}
/**
* Main event handling:
* - if receiving a recommendation event: performs a best-effort join (based on why info is available now),
* emits the result and records the event in the windowStore
* - if receiving a mood event: just record it in key-value store
*
* In all cases: also keep a trace of consultant's names we've encountered recently
* */
override def transform(key: String, event: Either[Recommendation, Mood]): KeyValue[String, MoodRec] =
event match {
case Left(rec) =>
val joined = join(rec.event_time, rec.consultant, rec.recommendation)
bestEffortJoinsStore.put(rec.consultant, joined, rec.event_time)
recordConsultantName(rec.consultant, rec.event_time)
new KeyValue(key, joined)
case Right(mood) =>
// println(s"new mood: $mood")
val updatedMoodHistory = (mood :: moodHistory(mood.name)).sortBy( - _.event_time)
// println(s"new mood history: $key & ${mood.name}-> $updatedMoodHistory")
moodStore.put(mood.name, updatedMoodHistory)
recordConsultantName(mood.name, mood.event_time)
null
}
/**
* joins this recommendation with the latest mood known before it, if any
* */
def join(recommendationTime: Long, consultant: String, recommendation: String): MoodRec = {
val maybeMood = moodHistory(consultant)
.dropWhile(_.event_time >= recommendationTime)
.headOption
.map(_.mood)
MoodRec(recommendationTime, consultant, maybeMood, recommendation)
}
def moodHistory(consultantName: String): List[Mood] =
Option(moodStore.get(consultantName)).getOrElse(Nil)
def recordConsultantName(name: String, eventTime: Long): Unit =
consultantNamesStore.put("all-recent-names", name, eventTime)
/**
* set of consultants encountered recently, either in a mood event or a recommendation events
* */
def allRecentConsultants(until: Long): Iterator[String] =
consultantNamesStore.fetch("all-recent-names", BEGINNING_OF_TIMES, until).asScala.map(_.value)
/**
* This is called every `reviewJoinPeriod` ms, it reviews previously joined events and re-emits if necessary
* */
override def punctuate(latestEventTime: Long): KeyValue[String, MoodRec] = {
println(s"revisiting previous joins, event time is $latestEventTime")
allRecentConsultants(until = latestEventTime).foreach {
consultantName => joinAgain(consultantName, maxEventTimestamp = latestEventTime - reviewLagDelta)
}
null
}
/**
* actual review of previously joined events of a consultant
* */
def joinAgain(consultantName: String, maxEventTimestamp: Long): Unit = {
// joined data as per when the recommendation event was received
val oldJoinedMoods = bestEffortJoinsStore
.fetch(consultantName, BEGINNING_OF_TIMES, maxEventTimestamp)
.asScala
.map(_.value)
.toList
val newJoinedMoods = oldJoinedMoods
.map(evt => join(evt.eventTime, evt.consultant, evt.recommendation))
// if updated join is different: emit the new value
(oldJoinedMoods zip newJoinedMoods)
.filter{ case( MoodRec(_, _, oldMood, _), MoodRec(_, _, newMood, _)) => oldMood != newMood }
.foreach{ case ( _ , updated ) => ctx.forward(s"bis-$consultantName", updated)}
// TODO: if we update the joined value, we should record that in the windowed storage
// TODO: explicitly evict old moods here (or use window store here as well?)
}
override def close(): Unit = {}
}
object EventTimeJoiner {
val supplier = new TransformerSupplier[String, Either[Recommendation, Mood], KeyValue[String, MoodRec]] {
override def get() = new EventTimeJoiner()
}
}
object EventTimeJoinExample {
/**
* quick and dirty parser that does not handle parsing errors ^^
* */
def parse(rawJson: String): Option[Either[Recommendation, Mood]] = {
val parsed = Recommendation.parseJson(rawJson) match {
case recommendation: JsSuccess[Recommendation] => Some(Left(recommendation.get))
case e1: JsError =>
MoodListJsonSerde.parseJsonMood(rawJson) match {
case mood: JsSuccess[Mood] => Some(Right(mood.get))
case e2: JsError => None
}
}
//println(s"parsed: $parsed")
parsed
}
/**
* extract the user id out of any of those parsed events
* */
def userId(event: Either[Recommendation, Mood]): String =
event match {
case Left(rec) => rec.consultant
case Right(mood) => mood.name
}
}
class TimeStampSniffer extends FailOnInvalidTimestamp {
override def extract(record: ConsumerRecord[AnyRef, AnyRef], previousTimestamp: Long) = {
val ts = super.extract(record, previousTimestamp)
println(s"event ts: $ts")
ts
}
}
object EventTimeJoinExampleApp extends App {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"event-time-join-example-${System.currentTimeMillis()}")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "10")
// p.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "svend.example.eventimejoin.TimeStampSniffer ")
p
}
// store of (consultantName -> chronological-list-of-moods)
val moodStore = Stores
.create("moods")
.withStringKeys
.withValues(MoodListJsonSerde)
.persistent()
.build
// window store of (consultantName -> (mood, recommendation)
val bestEffortJoinStore = Stores
.create("bestEffortJoins")
.withStringKeys
.withValues(MoodRecJsonSerde)
.persistent()
.windowed(1000, 100000, 10, false)
.build
// set of consultant names we encountered recently (only one hard-coded key)
val consultantStore = Stores
.create("consultants")
.withStringKeys
.withStringValues()
.persistent()
.windowed(1000, 100000, 10, false)
.build
val builder = new KStreamBuilder()
builder.addStateStore(moodStore)
.addStateStore(bestEffortJoinStore)
.addStateStore(consultantStore).asInstanceOf[KStreamBuilder]
val parsed = builder
.stream(Serdes.String(), Serdes.String(), "etj-moods-11", "etj-events-11")
.mapValues[Option[Either[Recommendation, Mood]]](EventTimeJoinExample.parse)
.filter{ case (_, v: Option[Either[Recommendation, Mood]]) => v.isDefined }
.mapValues[Either[Recommendation, Mood]](_.get)
.selectKey[String]{ case (_, v) => EventTimeJoinExample.userId(v)}
.transform(EventTimeJoiner.supplier, "moods", "bestEffortJoins", "consultants")
.print()
new KafkaStreams(builder, config).start()
}