Skip to content

Implement Support for DCB #360

@DomenicDev

Description

@DomenicDev

Let's use this issue to keep track of what needs to be done to make FModel DCB-compliant based on the specification at https://dcb.events/specification/.

Some days ago, I tried to implement DCB on top of the current FModel version.

import com.fraktalio.fmodel.application.EventComputation
import com.fraktalio.fmodel.domain.Decider
import com.fraktalio.fmodel.domain.IDecider
import kotlinx.coroutines.flow.*

// QUERY

sealed interface Query {

    companion object {

        fun all() = AllQuery

        fun fromItems(items: List<QueryItem>) = ItemsQuery(items)
    }

}

data class ItemsQuery(
    val queryItems: List<QueryItem>
) : Query

data class QueryItem(
    val types: List<String>?,
    val tags: List<String>?
) {

    init {
        require(types == null && tags == null) { "At least types or tags must be defined!" }
    }
}

data object AllQuery : Query




// Helper data structures

data class EventEnvelope<E>(
    val type: String,
    val data: E,
    val tags: List<String> = emptyList(),
)

data class SequencedEvent<E, P>(
    val event: EventEnvelope<E>,
    val position: P
)


data class AppendCondition<Q, P>(
    val failIfEventsMatch: Q,
    val after: P?
)


// Repository

interface DcbEventRepository<C, E, P, Q> {

    fun fetchEvents(query: Q): Flow<SequencedEvent<E, P>>

    fun Flow<EventEnvelope<E>>.save(condition: AppendCondition<Q, P>): Flow<EventEnvelope<E>>

}

interface EventSourcingDecisionModel<C, S, E, P, Q> : EventComputation<C, S, E>, DcbEventRepository<C, E, P, Q>


fun <C, S, E, P, Q> EventSourcingDecisionModel(
    decider: IDecider<C, S, E>,
    eventRepository: DcbEventRepository<C, E, P, Q>
): EventSourcingDecisionModel<C, S, E, P, Q> =
    object : EventSourcingDecisionModel<C, S, E, P, Q>,
        DcbEventRepository<C, E, P, Q> by eventRepository,
        IDecider<C, S, E> by decider {}


fun <C, S, E, P, Q> EventSourcingDecisionModel<C, S, E, P, Q>.handleOptimistically(
    command: C,
    query: Q,
    tagger: (E) -> EventEnvelope<E>
): Flow<E> = flow {
    val pastEvents = fetchEvents(query).toList()
    val lastPosition = pastEvents.lastOrNull()?.position
    val newEvents = pastEvents.asFlow()
        .map { it.event.data }
        .computeNewEvents(command)

    val newEnvelopes = newEvents.map(tagger)

    val appendCondition = AppendCondition(
        failIfEventsMatch = query,
        after = lastPosition
    )

    emitAll(newEnvelopes.save(appendCondition).map { it.data })
}

typealias TestDecider = Decider<String, String, String>

suspend fun main() {

    val repo = InMemoryDcbEventRepository<String>()
    val decider = TestDecider(
        decide = { c, _ -> flowOf("EVENT:${c} ") },
        evolve = { s, e -> s.plus(e) },
        initialState = ""
    )
    val decisionModel = EventSourcingDecisionModel(
        decider = decider,
        eventRepository = repo
    )

    val tagger: (String) -> EventEnvelope<String> = { data ->
        EventEnvelope(
            type = "MyType",
            data = data,
            tags = listOf("s:123")
        )
    }

    decisionModel.handleOptimistically(
        command = "test1",
        query = Query.all(),
        tagger = tagger
    ).toList().forEach { println(it) }

    decisionModel.handleOptimistically(
        command = "test2",
        query = Query.all(),
        tagger = tagger
    ).toList().forEach { println(it) }

    println(repo.fetchEvents(Query.all()).toList())
}

Simple in-memory DB:

class InMemoryDcbEventRepository<E> : DcbEventRepository<String, E, Long, Query> {

    val events = CopyOnWriteArrayList<SequencedEvent<E, Long>>()
    val sequence = AtomicLong(0)

    override fun fetchEvents(query: Query): Flow<SequencedEvent<E, Long>> = flow {
        val result = when (query) {
            is AllQuery -> events
            is ItemsQuery -> {
                events.filter { se ->
                    val eventType = se.event.type
                    val eventTags = se.event.tags
                    query.queryItems.any { item ->
                        (item.types?.any { eventType == it } ?: false) ||
                                (item.tags?.any { eventTags.contains(it) } ?: false)
                    }
                }
            }
        }
        result.forEach { emit(it) }
    }

    override fun Flow<EventEnvelope<E>>.save(condition: AppendCondition<Query, Long>): Flow<EventEnvelope<E>> = flow {
        val conflictingEvents = fetchEvents(condition.failIfEventsMatch)
            .filter { condition.after == null || it.position > condition.after }
            .toList()

        if (conflictingEvents.isNotEmpty()) {
            throw IllegalStateException("DCB violation: conflicting events detected")
        }

        this@save.collect { envelope ->
            val pos = sequence.incrementAndGet()
            events.add(SequencedEvent(envelope, pos))
            emit(envelope)
        }

    }
}

What I noticed is that this proof of concept introduces some new concepts (Query, EventEnvelope) that could be modeled differently.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions