Skip to content

Commit 1d053b5

Browse files
authored
Add EventRelayService (#11)
1 parent be9bf14 commit 1d053b5

43 files changed

Lines changed: 1083 additions & 210 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/EnvironmentOrchestrator.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package link.socket.ampere.agents.events
22

33
import link.socket.ampere.agents.events.api.AgentEventApi
4-
import link.socket.ampere.agents.events.bus.EventBus
4+
import link.socket.ampere.agents.events.bus.EventSerialBus
55
import link.socket.ampere.agents.events.meetings.Meeting
66
import link.socket.ampere.agents.events.meetings.MeetingOrchestrator
77
import link.socket.ampere.agents.events.meetings.MeetingRepository
@@ -28,7 +28,7 @@ import link.socket.ampere.agents.events.meetings.MeetingSchedulingService
2828
class EnvironmentOrchestrator(
2929
private val meetingRepository: MeetingRepository,
3030
private val ticketRepository: TicketRepository,
31-
private val eventBus: EventBus,
31+
private val eventSerialBus: EventSerialBus,
3232
private val messageApi: AgentMessageApi,
3333
private val eventApi: AgentEventApi,
3434
private val logger: EventLogger = ConsoleEventLogger(),
@@ -38,7 +38,7 @@ class EnvironmentOrchestrator(
3838
*/
3939
val meetingOrchestrator: MeetingOrchestrator = MeetingOrchestrator(
4040
repository = meetingRepository,
41-
eventBus = eventBus,
41+
eventSerialBus = eventSerialBus,
4242
messageApi = messageApi,
4343
logger = logger,
4444
)
@@ -51,7 +51,7 @@ class EnvironmentOrchestrator(
5151
*/
5252
val ticketOrchestrator: TicketOrchestrator = TicketOrchestrator(
5353
ticketRepository = ticketRepository,
54-
eventBus = eventBus,
54+
eventSerialBus = eventSerialBus,
5555
messageApi = messageApi,
5656
meetingSchedulingService = createMeetingSchedulingService(),
5757
logger = logger,
@@ -62,7 +62,7 @@ class EnvironmentOrchestrator(
6262
*/
6363
val eventRouter: EventRouter = EventRouter(
6464
eventApi = eventApi,
65-
eventBus = eventBus,
65+
eventSerialBus = eventSerialBus,
6666
)
6767

6868
/**

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/Event.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ sealed class EventSource {
2525
data class Agent(val agentId: AgentId) : EventSource()
2626

2727
@Serializable
28-
data object Human : EventSource()
28+
data object Human : EventSource() {
29+
const val ID = "human"
30+
}
2931

3032
fun getIdentifier(): String = when (this) {
3133
is Agent -> agentId
32-
is Human -> "human"
34+
is Human -> Human.ID
3335
}
3436
}
3537

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/EventRepository.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,25 @@ class EventRepository(
117117
}
118118
}
119119

120+
/**
121+
* Retrieve events between [fromTime] and [toTime] (inclusive), ascending by time.
122+
*/
123+
suspend fun getEventsBetween(fromTime: Instant, toTime: Instant): Result<List<Event>> =
124+
withContext(Dispatchers.IO) {
125+
runCatching {
126+
queries
127+
.getEventsBetween(
128+
fromTime.toEpochMilliseconds(),
129+
toTime.toEpochMilliseconds()
130+
)
131+
.executeAsList()
132+
}.map { rows ->
133+
rows.map { row ->
134+
decode(row.payload)
135+
}
136+
}
137+
}
138+
120139
private fun encode(event: Event): String = try {
121140
json.encodeToString(
122141
serializer = Event.serializer(),

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/EventRouter.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ package link.socket.ampere.agents.events
22

33
import link.socket.ampere.agents.core.AgentId
44
import link.socket.ampere.agents.events.api.AgentEventApi
5-
import link.socket.ampere.agents.events.bus.EventBus
5+
import link.socket.ampere.agents.events.bus.EventSerialBus
66
import link.socket.ampere.agents.events.subscription.EventSubscription
77

88
class EventRouter(
99
private val eventApi: AgentEventApi,
10-
private val eventBus: EventBus,
10+
private val eventSerialBus: EventSerialBus,
1111
) {
1212
private val eventsByEventClassTypeSubscriptions = mutableMapOf<AgentId, EventSubscription.ByEventClassType>()
1313

@@ -18,7 +18,7 @@ class EventRouter(
1818
agentId = agentId,
1919
event = event,
2020
eventSubscription = subscription,
21-
).let { notificationEvent -> eventBus.publish(notificationEvent) }
21+
).let { notificationEvent -> eventSerialBus.publish(notificationEvent) }
2222
}
2323
}
2424

@@ -28,7 +28,7 @@ class EventRouter(
2828
agentId = agentId,
2929
event = event,
3030
eventSubscription = subscription,
31-
).let { notificationEvent -> eventBus.publish(notificationEvent) }
31+
).let { notificationEvent -> eventSerialBus.publish(notificationEvent) }
3232
}
3333
}
3434

@@ -38,7 +38,7 @@ class EventRouter(
3838
agentId = agentId,
3939
event = event,
4040
eventSubscription = subscription,
41-
).let { notificationEvent -> eventBus.publish(notificationEvent) }
41+
).let { notificationEvent -> eventSerialBus.publish(notificationEvent) }
4242
}
4343
}
4444
}

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/api/AgentEventApi.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import link.socket.ampere.agents.events.EventClassType
88
import link.socket.ampere.agents.events.EventRepository
99
import link.socket.ampere.agents.events.EventSource
1010
import link.socket.ampere.agents.events.Urgency
11-
import link.socket.ampere.agents.events.bus.EventBus
11+
import link.socket.ampere.agents.events.bus.EventSerialBus
1212
import link.socket.ampere.agents.events.bus.subscribe
1313
import link.socket.ampere.agents.events.subscription.EventSubscription
1414
import link.socket.ampere.agents.events.subscription.Subscription
@@ -44,15 +44,15 @@ class EventFilter<E : Event>(
4444
class AgentEventApi(
4545
val agentId: AgentId,
4646
private val eventRepository: EventRepository,
47-
private val eventBus: EventBus,
47+
private val eventSerialBus: EventSerialBus,
4848
private val logger: EventLogger = ConsoleEventLogger(),
4949
) {
5050

5151
/** Persist and publish a pre-constructed event. */
5252
suspend fun publish(event: Event) {
5353
eventRepository.saveEvent(event)
5454
.onSuccess {
55-
eventBus.publish(event)
55+
eventSerialBus.publish(event)
5656
}
5757
.onFailure { throwable ->
5858
logger.logError(
@@ -127,7 +127,7 @@ class AgentEventApi(
127127
filter: EventFilter<Event.TaskCreated> = EventFilter.noFilter(),
128128
handler: suspend (Event.TaskCreated, Subscription?) -> Unit,
129129
): Subscription =
130-
eventBus.subscribe<Event.TaskCreated, EventSubscription.ByEventClassType>(
130+
eventSerialBus.subscribe<Event.TaskCreated, EventSubscription.ByEventClassType>(
131131
agentId = agentId,
132132
eventClassType = Event.TaskCreated.EVENT_CLASS_TYPE,
133133
) { event, subscription ->
@@ -141,7 +141,7 @@ class AgentEventApi(
141141
filter: EventFilter<Event.QuestionRaised> = EventFilter.noFilter(),
142142
handler: suspend (Event.QuestionRaised, Subscription?) -> Unit,
143143
): Subscription =
144-
eventBus.subscribe<Event.QuestionRaised, EventSubscription.ByEventClassType>(
144+
eventSerialBus.subscribe<Event.QuestionRaised, EventSubscription.ByEventClassType>(
145145
agentId = agentId,
146146
eventClassType = Event.QuestionRaised.EVENT_CLASS_TYPE,
147147
) { event, subscription ->
@@ -155,7 +155,7 @@ class AgentEventApi(
155155
filter: EventFilter<Event.CodeSubmitted> = EventFilter.noFilter(),
156156
handler: suspend (Event.CodeSubmitted, Subscription?) -> Unit,
157157
): Subscription =
158-
eventBus.subscribe<Event.CodeSubmitted, EventSubscription.ByEventClassType>(
158+
eventSerialBus.subscribe<Event.CodeSubmitted, EventSubscription.ByEventClassType>(
159159
agentId = agentId,
160160
eventClassType = Event.CodeSubmitted.EVENT_CLASS_TYPE,
161161
) { event, subscription ->
@@ -224,7 +224,7 @@ class AgentEventApi(
224224
) {
225225
val events = getRecentEvents(since, eventClassType)
226226
for (event in events) {
227-
eventBus.publish(event)
227+
eventSerialBus.publish(event)
228228
}
229229
}
230230
}

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/api/AgentEventApiFactory.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package link.socket.ampere.agents.events.api
22

33
import link.socket.ampere.agents.core.AgentId
44
import link.socket.ampere.agents.events.EventRepository
5-
import link.socket.ampere.agents.events.bus.EventBus
5+
import link.socket.ampere.agents.events.bus.EventSerialBus
66
import link.socket.ampere.agents.events.utils.ConsoleEventLogger
77
import link.socket.ampere.agents.events.utils.EventLogger
88

99
/** Factory to create [AgentEventApi] instances wired to a persistent EventBus. */
1010
class AgentEventApiFactory(
1111
private val eventRepository: EventRepository,
12-
private val eventBus: EventBus,
12+
private val eventSerialBus: EventSerialBus,
1313
private val logger: EventLogger = ConsoleEventLogger(),
1414
) {
1515
/**
@@ -19,7 +19,7 @@ class AgentEventApiFactory(
1919
AgentEventApi(
2020
agentId = agentId,
2121
eventRepository = eventRepository,
22-
eventBus = eventBus,
22+
eventSerialBus = eventSerialBus,
2323
logger = logger,
2424
)
2525
}

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/bus/EventBus.kt renamed to shared/src/commonMain/kotlin/link/socket/ampere/agents/events/bus/EventSerialBus.kt

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,23 @@ typealias HandlerMap = MutableMap<EventClassType, List<EventHandler<Event, Subsc
1818
typealias SubscriptionMap = MutableMap<EventClassType, Subscription>
1919

2020
/**
21-
* Type-safe event bus for publish-subscribe communication between agents with optional persistence.
21+
* EventSerialBus (ESB) is a thread-safe, Kotlin Multiplatform-compatible event bus.
22+
* Uses serialized event subscriptions to allow asynchronous two-way communication, which is then used
23+
* to enable publish-subscribe communication between Agents and Humans.
2224
*
25+
* Features:
2326
* - Thread-safe and Kotlin Multiplatform compatible
2427
* - Handlers are invoked asynchronously using the provided [CoroutineScope]
2528
* - Persistence is handled by higher-level APIs; EventBus only dispatches events to subscribers
2629
*/
27-
class EventBus(
30+
class EventSerialBus(
2831
private val scope: CoroutineScope,
2932
private val logger: EventLogger = ConsoleEventLogger(),
3033
) {
31-
// Map event from EventClassType -> (subscriptionId, eventHandler)
34+
/** Map event from EventClassType -> (subscriptionId, eventHandler) */
3235
private val handlerMap: HandlerMap = mutableMapOf()
3336

34-
// Map from subscriptionId -> EventClassType (to efficiently locate the handler on unsubscribe)
37+
/** Map from subscriptionId -> EventClassType (to efficiently locate the handler on unsubscribe) */
3538
private val subscriptionMap: SubscriptionMap = mutableMapOf()
3639

3740
private val mutex = Mutex()
@@ -91,7 +94,13 @@ class EventBus(
9194
// Register handler under lock
9295
runBlockingLock {
9396
val existing = handlerMap[eventClassType]
94-
val updated = if (existing == null) listOf(eventHandler) else existing + eventHandler
97+
98+
val updated = if (existing == null) {
99+
listOf(eventHandler)
100+
} else {
101+
existing + eventHandler
102+
}
103+
95104
handlerMap[eventClassType] = updated
96105
subscriptionMap.getOrPut(eventClassType) { subscription }
97106
}
@@ -115,7 +124,7 @@ class EventBus(
115124
}
116125
}
117126

118-
// Helper to reuse the same locking pattern in non-suspending API without exposing Mutex.
127+
/** Helper to reuse the same locking pattern in non-suspending API without exposing Mutex */
119128
private inline fun <R> runBlockingLock(
120129
crossinline block: () -> R,
121130
): R = runBlocking {
@@ -127,7 +136,7 @@ class EventBus(
127136
/**
128137
* Inline reified helper for ergonomic subscriptions.
129138
*/
130-
inline fun <reified E : Event, reified S : Subscription> EventBus.subscribe(
139+
inline fun <reified E : Event, reified S : Subscription> EventSerialBus.subscribe(
131140
agentId: AgentId,
132141
eventClassType: EventClassType,
133142
noinline handler: suspend (E, S?) -> Unit,

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/bus/EventBusFactory.kt renamed to shared/src/commonMain/kotlin/link/socket/ampere/agents/events/bus/EventSerialBusFactory.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@ import link.socket.ampere.agents.events.utils.ConsoleEventLogger
55
import link.socket.ampere.agents.events.utils.EventLogger
66

77
/**
8-
* Factory for creating an [EventBus]. Persistence is handled by higher-level APIs.
8+
* Factory for creating an [EventSerialBus]. Persistence is handled by higher-level APIs.
99
*/
10-
class EventBusFactory(
10+
class EventSerialBusFactory(
1111
private val scope: CoroutineScope,
1212
private val logger: EventLogger = ConsoleEventLogger(),
1313
) {
1414
/**
15-
* Create a new [EventBus].
15+
* Create a new [EventSerialBus].
1616
*/
17-
fun create(): EventBus = EventBus(
18-
scope = scope,
19-
logger = logger,
20-
)
17+
fun create(): EventSerialBus =
18+
EventSerialBus(
19+
scope = scope,
20+
logger = logger,
21+
)
2122
}

shared/src/commonMain/kotlin/link/socket/ampere/agents/events/meetings/MeetingOrchestrator.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import link.socket.ampere.agents.core.status.TaskStatus
88
import link.socket.ampere.agents.core.tasks.MeetingTask
99
import link.socket.ampere.agents.events.EventSource
1010
import link.socket.ampere.agents.events.MeetingEvent
11-
import link.socket.ampere.agents.events.bus.EventBus
11+
import link.socket.ampere.agents.events.bus.EventSerialBus
1212
import link.socket.ampere.agents.events.messages.AgentMessageApi
1313
import link.socket.ampere.agents.events.messages.MessageChannel
1414
import link.socket.ampere.agents.events.utils.ConsoleEventLogger
@@ -22,7 +22,7 @@ import link.socket.ampere.util.randomUUID
2222
*/
2323
class MeetingOrchestrator(
2424
private val repository: MeetingRepository,
25-
private val eventBus: EventBus,
25+
private val eventSerialBus: EventSerialBus,
2626
private val messageApi: AgentMessageApi,
2727
private val logger: EventLogger = ConsoleEventLogger(),
2828
) {
@@ -114,7 +114,7 @@ class MeetingOrchestrator(
114114
requireNotNull(createdMeeting) { "Failed to persist meeting ${meeting.id}" }
115115

116116
// Publish MeetingScheduled event
117-
eventBus.publish(
117+
eventSerialBus.publish(
118118
MeetingEvent.MeetingScheduled(
119119
eventId = generateUUID(createdMeeting.id),
120120
meeting = createdMeeting,
@@ -186,7 +186,7 @@ class MeetingOrchestrator(
186186
}
187187

188188
// Publish MeetingStarted event
189-
eventBus.publish(
189+
eventSerialBus.publish(
190190
MeetingEvent.MeetingStarted(
191191
eventId = randomUUID(),
192192
meetingId = meetingId,
@@ -249,7 +249,7 @@ class MeetingOrchestrator(
249249
val updatedItem = nextItem.copy(status = TaskStatus.InProgress)
250250

251251
// Publish AgendaItemStarted event
252-
eventBus.publish(
252+
eventSerialBus.publish(
253253
MeetingEvent.AgendaItemStarted(
254254
eventId = randomUUID(),
255255
meetingId = meetingId,
@@ -322,7 +322,7 @@ class MeetingOrchestrator(
322322
}
323323

324324
// Publish MeetingCompleted event
325-
eventBus.publish(
325+
eventSerialBus.publish(
326326
MeetingEvent.MeetingCompleted(
327327
eventId = randomUUID(),
328328
meetingId = meetingId,

0 commit comments

Comments
 (0)