Skip to content

Commit 0b8a879

Browse files
mr-gitDenys Fakhritdinov
authored and
Denys Fakhritdinov
committed
make version non-optional
1 parent 916090a commit 0b8a879

File tree

26 files changed

+96
-101
lines changed

26 files changed

+96
-101
lines changed

core/src/main/scala/com/evolutiongaming/kafka/journal/Version.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ object Version {
1414

1515
val current: Version = Version(Option(Version.getClass.getPackage.getImplementationVersion).getOrElse("unknown"))
1616

17+
/** The last release before [[Version]] was introduced, should be used only as fallback during data recovery */
18+
val obsolete: Version = Version("0.0.152")
19+
1720
implicit val eqVersion: Eq[Version] = Eq.fromUniversalEquals
1821

1922
implicit val showVersion: Show[Version] = Show.fromToString

eventual-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/JournalStatements.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private[journal] object JournalStatements {
122122
.encode(record.event.partitionOffset)
123123
.encode("timestamp", record.event.timestamp)
124124
.encodeSome(record.event.origin)
125-
.encodeSome(record.event.version)
125+
.encode(record.event.version)
126126
.encode("tags", record.event.event.tags)
127127
.encodeSome("meta_record_id", record.metaRecordId)
128128
.encodeSome("payload_type", payloadType)
@@ -189,7 +189,7 @@ private[journal] object JournalStatements {
189189
} yield {
190190
new SelectRecords[F] {
191191

192-
def apply(key: Key, segment: SegmentNr, range: SeqRange) = {
192+
def apply(key: Key, segment: SegmentNr, range: SeqRange): Stream[F, JournalRecord] = {
193193

194194
def readPayload(row: Row): Option[EventualPayloadAndType] = {
195195
val payloadType = row.decode[Option[PayloadType]]("payload_type")
@@ -226,7 +226,7 @@ private[journal] object JournalStatements {
226226
event = event,
227227
timestamp = row.decode[Instant]("timestamp"),
228228
origin = row.decode[Option[Origin]],
229-
version = row.decode[Option[Version]],
229+
version = row.decode[Option[Version]].getOrElse(Version.obsolete),
230230
partitionOffset = partitionOffset,
231231
metadata = metadata,
232232
headers = headers,

eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class EventualCassandraTest extends AnyFunSuite with Matchers {
4242
timestamp = timestamp0,
4343
partitionOffset = partitionOffset,
4444
origin = origin.some,
45-
version = version.some,
45+
version = version,
4646
metadata = RecordMetadata(HeaderMetadata(Json.obj(("key", "value")).some), PayloadMetadata.empty),
4747
headers = Headers(("key", "value")),
4848
)

eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class ReplicatedCassandraTest extends AnyFunSuite with Matchers {
4747
timestamp = timestamp0,
4848
partitionOffset = partitionOffset,
4949
origin = origin.some,
50-
version = version.some,
50+
version = version,
5151
metadata = RecordMetadata(HeaderMetadata(Json.obj(("key", "value")).some), PayloadMetadata.empty),
5252
headers = Headers(("key", "value")),
5353
)

journal/src/main/scala/com/evolutiongaming/kafka/journal/Action.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ sealed abstract class Action extends Product {
1717

1818
def origin: Option[Origin] = header.origin
1919

20-
def version: Option[Version] = header.version
20+
def version: Version = header.version
2121
}
2222

2323
object Action {
@@ -79,7 +79,7 @@ object Action {
7979
key: Key,
8080
timestamp: Instant,
8181
origin: Option[Origin],
82-
version: Option[Version],
82+
version: Version,
8383
events: Events[A],
8484
metadata: HeaderMetadata,
8585
headers: Headers,
@@ -120,7 +120,7 @@ object Action {
120120
timestamp: Instant,
121121
to: DeleteTo,
122122
origin: Option[Origin],
123-
version: Option[Version],
123+
version: Version,
124124
): Delete = {
125125
val header = ActionHeader.Delete(to, origin, version)
126126
Delete(key, timestamp, header)
@@ -135,7 +135,7 @@ object Action {
135135

136136
object Purge {
137137

138-
def apply(key: Key, timestamp: Instant, origin: Option[Origin], version: Option[Version]): Purge = {
138+
def apply(key: Key, timestamp: Instant, origin: Option[Origin], version: Version): Purge = {
139139
val header = ActionHeader.Purge(origin, version)
140140
Purge(key, timestamp, header)
141141
}
@@ -157,7 +157,7 @@ object Action {
157157
timestamp: Instant,
158158
id: String,
159159
origin: Option[Origin],
160-
version: Option[Version],
160+
version: Version,
161161
): Mark = {
162162
val header = ActionHeader.Mark(id, origin, version)
163163
Mark(key, timestamp, header)

journal/src/main/scala/com/evolutiongaming/kafka/journal/ActionHeader.scala

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ sealed abstract class ActionHeader extends Product {
99

1010
def origin: Option[Origin]
1111

12-
def version: Option[Version]
12+
def version: Version
1313
}
1414

1515
object ActionHeader {
@@ -18,34 +18,10 @@ object ActionHeader {
1818

1919
implicit val formatOptActionHeader: OFormat[Option[ActionHeader]] = {
2020

21-
val appendFormat = {
22-
val format = Json.format[Append]
23-
val reads = format orElse new Reads[Append] {
24-
def reads(json: JsValue) = {
25-
26-
def metadata = {
27-
(json \ "metadata").validate[JsObject] match {
28-
case JsSuccess(a, _) => a.validate[HeaderMetadata]
29-
case _: JsError => HeaderMetadata.empty.pure[JsResult]
30-
}
31-
}
32-
33-
for {
34-
range <- (json \ "range").validate[SeqRange]
35-
origin <- (json \ "origin").validateOpt[Origin]
36-
version <- (json \ "version").validateOpt[Version]
37-
payloadType <- (json \ "payloadType").validate[PayloadType.BinaryOrJson]
38-
metadata <- metadata
39-
} yield {
40-
Append(range, origin, version, payloadType, metadata)
41-
}
42-
}
43-
}
44-
OFormat(reads, format)
45-
}
46-
val deleteFormat = Json.format[Delete]
47-
val purgeFormat = Json.format[Purge]
48-
val readFormat = Json.format[Mark]
21+
val appendFormat = Json.using[Json.WithDefaultValues].format[Append]
22+
val deleteFormat = Json.using[Json.WithDefaultValues].format[Delete]
23+
val purgeFormat = Json.using[Json.WithDefaultValues].format[Purge]
24+
val readFormat = Json.using[Json.WithDefaultValues].format[Mark]
4925

5026
new OFormat[Option[ActionHeader]] {
5127

@@ -96,25 +72,25 @@ object ActionHeader {
9672
final case class Append(
9773
range: SeqRange,
9874
origin: Option[Origin],
99-
version: Option[Version],
75+
version: Version = Version.obsolete,
10076
payloadType: PayloadType.BinaryOrJson,
101-
metadata: HeaderMetadata,
77+
metadata: HeaderMetadata = HeaderMetadata.empty,
10278
) extends AppendOrDelete
10379

10480
final case class Delete(
10581
to: DeleteTo,
10682
origin: Option[Origin],
107-
version: Option[Version],
83+
version: Version = Version.obsolete,
10884
) extends AppendOrDelete
10985

11086
final case class Purge(
11187
origin: Option[Origin],
112-
version: Option[Version],
88+
version: Version = Version.obsolete,
11389
) extends AppendOrDelete
11490

11591
final case class Mark(
11692
id: String,
11793
origin: Option[Origin],
118-
version: Option[Version],
94+
version: Version = Version.obsolete,
11995
) extends ActionHeader
12096
}

journal/src/main/scala/com/evolutiongaming/kafka/journal/EventRecord.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ final case class EventRecord[A](
1414
timestamp: Instant,
1515
partitionOffset: PartitionOffset,
1616
origin: Option[Origin],
17-
version: Option[Version],
17+
version: Version,
1818
metadata: RecordMetadata,
1919
headers: Headers,
2020
) {

journal/src/main/scala/com/evolutiongaming/kafka/journal/Produce.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[journal] object Produce {
9090
ActionHeader.Append(
9191
range = range,
9292
origin = origin,
93-
version = version.some,
93+
version = version,
9494
payloadType = payloadAndType.payloadType,
9595
metadata = metadata,
9696
),
@@ -109,15 +109,15 @@ private[journal] object Produce {
109109
def delete(key: Key, to: DeleteTo): F[PartitionOffset] = {
110110
for {
111111
timestamp <- Clock[F].instant
112-
action = Action.Delete(key, timestamp, to, origin, version.some)
112+
action = Action.Delete(key, timestamp, to, origin, version)
113113
result <- send(action)
114114
} yield result
115115
}
116116

117117
def purge(key: Key): F[PartitionOffset] = {
118118
for {
119119
timestamp <- Clock[F].instant
120-
action = Action.Purge(key, timestamp, origin, version.some)
120+
action = Action.Purge(key, timestamp, origin, version)
121121
result <- send(action)
122122
} yield result
123123
}
@@ -126,7 +126,7 @@ private[journal] object Produce {
126126
for {
127127
timestamp <- Clock[F].instant
128128
id = randomId.value
129-
action = Action.Mark(key, timestamp, id, origin, version.some)
129+
action = Action.Mark(key, timestamp, id, origin, version)
130130
result <- send(action)
131131
} yield result
132132
}
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
{
22
"delete": {
3-
"to": 3,
4-
"version": "0.0.1"
3+
"to": 3
54
}
65
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"mark": {
33
"id": "id",
4-
"origin": "origin"
4+
"origin": "origin",
5+
"version": "0.0.1"
56
}
67
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"purge": {
3-
"origin": "origin"
3+
"origin": "origin",
4+
"version": "0.0.1"
45
}
56
}

journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionHeaderJsonSpec.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,42 @@ class ActionHeaderJsonSpec extends AnyFunSuite with Matchers {
3030
test(s"Append format, origin: $origin, payloadType: $payloadType, metadata: $metadataStr") {
3131
val range = SeqRange.unsafe(1, 5)
3232
val header =
33-
ActionHeader.Append(range = range, origin = origin, version = none, payloadType = payloadType, metadata = metadata)
33+
ActionHeader.Append(
34+
range = range,
35+
origin = origin,
36+
version = Version.obsolete,
37+
payloadType = payloadType,
38+
metadata = metadata,
39+
)
3440
verify(header, s"Append-$originStr-$payloadType-$metadataStr")
3541
}
3642
}
3743

3844
test(s"Delete format, origin: $origin") {
39-
val seqNr = SeqNr.unsafe(3)
40-
val header = ActionHeader.Delete(seqNr.toDeleteTo, origin, Version("0.0.1").some)
45+
val seqNr = SeqNr.unsafe(3)
46+
val version = origin match {
47+
case Some(_) => Version("0.0.1")
48+
case None => Version.obsolete
49+
}
50+
val header = ActionHeader.Delete(seqNr.toDeleteTo, origin, version)
4151
verify(header, s"Delete-$originStr")
4252
}
4353

4454
test(s"Purge format, origin: $origin") {
45-
val header = ActionHeader.Purge(origin, none)
55+
val version = origin match {
56+
case Some(_) => Version("0.0.1")
57+
case None => Version.obsolete
58+
}
59+
val header = ActionHeader.Purge(origin, version)
4660
verify(header, s"Purge-$originStr")
4761
}
4862

4963
test(s"Mark format, origin: $origin") {
50-
val header = ActionHeader.Mark("id", origin, none)
64+
val version = origin match {
65+
case Some(_) => Version("0.0.1")
66+
case None => Version.obsolete
67+
}
68+
val header = ActionHeader.Mark("id", origin, version)
5169
verify(header, s"Mark-$originStr")
5270
}
5371
}

journal/src/test/scala/com/evolutiongaming/kafka/journal/ActionToProducerRecordSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class ActionToProducerRecordSpec extends AnyFunSuite with Matchers {
2929

3030
private val origins = List(Origin("origin").some, none[Origin])
3131

32-
private val versions = List(Version.current.some, none[Version])
32+
private val versions = List(Version.current)
3333

3434
private val seqNrs = List(SeqNr.min, SeqNr.max)
3535

journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class HeadCacheSpec extends AsyncWordSpec with Matchers {
109109
result <- Concurrent[IO].start { headCache.get(key = key, partition = partition, offset = marker) }
110110
_ <- stateRef.update { _.copy(topics = Map((topic, List(partition)))) }
111111
_ <- stateRef.update { state =>
112-
val action = Action.Mark(key, timestamp, ActionHeader.Mark("mark", none, Version.current.some))
112+
val action = Action.Mark(key, timestamp, ActionHeader.Mark("mark", none, Version.current))
113113
val record = consumerRecordOf(action, topicPartition, marker)
114114
val records = ConsumerRecordsOf(List(record))
115115
state.enqueue(records.pure[Try])
@@ -260,7 +260,7 @@ object HeadCacheSpec {
260260
key = key,
261261
timestamp = timestamp,
262262
origin = none,
263-
version = Version.current.some,
263+
version = Version.current,
264264
events = Events(Nel.of(Event(seqNr)), PayloadMetadata.empty),
265265
metadata = recordMetadata,
266266
headers = headers,

journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadInfoSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,20 +66,20 @@ class HeadInfoSpec extends AnyFunSuite with Matchers {
6666
ActionHeader.Append(
6767
range = SeqRange.unsafe(from, to),
6868
origin = None,
69-
version = Version.current.some,
69+
version = Version.current,
7070
payloadType = PayloadType.Json,
7171
metadata = HeaderMetadata.empty,
7272
)
7373
}
7474

7575
private def delete(seqNr: Int) = {
7676
val deleteTo = SeqNr.unsafe(seqNr).toDeleteTo
77-
ActionHeader.Delete(deleteTo, none, Version.current.some)
77+
ActionHeader.Delete(deleteTo, none, Version.current)
7878
}
7979

80-
private def mark = ActionHeader.Mark("id", none, Version.current.some)
80+
private def mark = ActionHeader.Mark("id", none, Version.current)
8181

82-
private def purge = ActionHeader.Purge(none, Version.current.some)
82+
private def purge = ActionHeader.Purge(none, Version.current)
8383

8484
private def deleteInfo(seqNr: Int) = {
8585
val deleteTo = SeqNr.unsafe(seqNr).toDeleteTo

journal/src/test/scala/com/evolutiongaming/kafka/journal/PartitionCacheSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class PartitionCacheSpec extends AsyncFunSuite with Matchers {
120120
test("get HeadInfo.delete") {
121121
partitionCacheOf()
122122
.use { cache =>
123-
val actionHeader = ActionHeader.Delete(to = DeleteTo(seqNr0), origin = none, version = none)
123+
val actionHeader = ActionHeader.Delete(to = DeleteTo(seqNr0), origin = none, version = Version.obsolete)
124124
for {
125125
a <- cache.add(Record(id0, offset0, actionHeader))
126126
_ <- IO { a shouldEqual none }
@@ -638,15 +638,15 @@ object PartitionCacheSpec {
638638
val seqNr0: SeqNr = SeqNr.min
639639
val seqNr1: SeqNr = seqNr0.next[Try].get
640640

641-
val actionHeader: ActionHeader = ActionHeader.Mark("mark", none, none)
641+
val actionHeader: ActionHeader = ActionHeader.Mark("mark", none, Version.obsolete)
642642

643643
def actionHeaderOf(seqNr: SeqNr): ActionHeader.Append = {
644644
ActionHeader.Append(
645645
range = SeqRange(seqNr),
646646
origin = none,
647647
payloadType = PayloadType.Binary,
648648
metadata = HeaderMetadata.empty,
649-
version = none,
649+
version = Version.obsolete,
650650
)
651651
}
652652

journal/src/test/scala/com/evolutiongaming/kafka/journal/StreamActionRecordsSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object StreamActionRecordsSpec {
7373

7474
val (marker, markRecord) = {
7575
val offset = pointers.lastOption.fold(1L) { _.offset + 1 }
76-
val mark = Action.Mark(key, timestamp, ActionHeader.Mark("mark", none, Version.current.some))
76+
val mark = Action.Mark(key, timestamp, ActionHeader.Mark("mark", none, Version.current))
7777
val partitionOffset = PartitionOffset(offset = Offset.unsafe(offset))
7878
val record = ActionRecord(mark, partitionOffset)
7979
val marker = Marker(mark.id, partitionOffset)
@@ -87,7 +87,7 @@ object StreamActionRecordsSpec {
8787
val header = ActionHeader.Append(
8888
range = range,
8989
origin = none,
90-
version = Version.current.some,
90+
version = Version.current,
9191
payloadType = PayloadType.Json,
9292
metadata = HeaderMetadata.empty,
9393
)

0 commit comments

Comments
 (0)