Skip to content

Commit db0329a

Browse files
authored
[codex][AMPR-180 #508] Emission protocol foundation (#535)
* AMPR-180 #508 Add emission protocol foundation Written by Codex. * AMPR-180 #508 Fix flaky human escalation flow tests
1 parent 433a18b commit db0329a

11 files changed

Lines changed: 214 additions & 120 deletions

File tree

ampere-core/src/commonMain/kotlin/link/socket/ampere/agents/events/EventRepository.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import kotlinx.coroutines.withContext
55
import kotlinx.datetime.Instant
66
import kotlinx.serialization.SerializationException
77
import kotlinx.serialization.json.Json
8+
import link.socket.ampere.agents.domain.event.EmissionEvent
89
import link.socket.ampere.agents.domain.event.Event
910
import link.socket.ampere.agents.domain.event.EventId
1011
import link.socket.ampere.agents.domain.event.EventType
@@ -255,5 +256,6 @@ private fun Event.runIdOrNull(): String? = when (this) {
255256
is MemoryEvent.MilestoneReached -> runId
256257
is link.socket.ampere.agents.domain.event.TaskEvent.TaskCompleted -> runId
257258
is link.socket.ampere.agents.domain.event.TaskEvent.TaskFailed -> runId
259+
is EmissionEvent.Produced -> emission.provenance.runId
258260
else -> null
259261
}

ampere-core/src/commonMain/kotlin/link/socket/ampere/agents/events/messages/AgentMessageApi.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,17 @@ class AgentMessageApi(
164164
* until the human replies.
165165
* 4. Transition thread back to [EventStatus.Open] and post the reply.
166166
*
167+
* Set [awaitReply] to `false` for legacy fire-and-forget thread escalation
168+
* where the caller only needs the durable status transition and bus events.
169+
*
167170
* Thread state transitions are owned exclusively by this method — handlers must not
168171
* re-transition (CHI cell invariant).
169172
*/
170173
suspend fun escalateToHuman(
171174
threadId: MessageThreadId,
172175
reason: String,
173176
context: Map<String, String> = emptyMap(),
177+
awaitReply: Boolean = true,
174178
) {
175179
val thread = messageRepository
176180
.findThreadById(threadId)
@@ -227,6 +231,10 @@ class AgentMessageApi(
227231
),
228232
)
229233

234+
if (!awaitReply) {
235+
return
236+
}
237+
230238
// Step 3: Produce the Emission and suspend for reply.
231239
val contextString = context.entries.joinToString("\n") { "${it.key}: ${it.value}" }
232240
.ifEmpty { null }

ampere-core/src/commonMain/kotlin/link/socket/ampere/agents/events/tickets/TicketOrchestrator.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ class TicketOrchestrator(
436436
"reportedBy" to reportedByAgentId,
437437
"priority" to updatedTicket.priority.name,
438438
),
439+
awaitReply = false,
439440
)
440441
}
441442

ampere-core/src/jvmTest/kotlin/link/socket/ampere/agents/data/EventRepositoryTest.kt

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,16 @@ import kotlinx.coroutines.test.TestScope
1515
import kotlinx.coroutines.test.UnconfinedTestDispatcher
1616
import kotlinx.datetime.Clock
1717
import kotlinx.datetime.Instant
18+
import kotlinx.serialization.json.JsonPrimitive
1819
import link.socket.ampere.agents.domain.Urgency
20+
import link.socket.ampere.agents.domain.emission.Affordance
21+
import link.socket.ampere.agents.domain.emission.DangerLevel
22+
import link.socket.ampere.agents.domain.emission.Emission
23+
import link.socket.ampere.agents.domain.emission.EmissionKind
24+
import link.socket.ampere.agents.domain.emission.EmissionPayload
25+
import link.socket.ampere.agents.domain.emission.EmissionProvenance
26+
import link.socket.ampere.agents.domain.emission.inputDigest
27+
import link.socket.ampere.agents.domain.event.EmissionEvent
1928
import link.socket.ampere.agents.domain.event.Event
2029
import link.socket.ampere.agents.domain.event.EventSource
2130
import link.socket.ampere.agents.events.EventRepository
@@ -32,13 +41,14 @@ class EventRepositoryTest {
3241
private val stubEventSourceB = EventSource.Agent("agent-B")
3342

3443
private lateinit var driver: JdbcSqliteDriver
44+
private lateinit var database: Database
3545
private lateinit var repo: EventRepository
3646

3747
@BeforeTest
3848
fun setUp() {
3949
driver = JdbcSqliteDriver(JdbcSqliteDriver.IN_MEMORY)
4050
Database.Schema.create(driver)
41-
val database = Database.Companion(driver)
51+
database = Database.Companion(driver)
4252
repo = EventRepository(stubJson, testScope, database)
4353
}
4454

@@ -85,6 +95,52 @@ class EventRepositoryTest {
8595
}
8696
}
8797

98+
@Test
99+
fun `saveEvent stores emission produced provenance run id`() {
100+
runBlocking {
101+
val payload = EmissionPayload.Confirmation(
102+
action = "deploy production",
103+
preview = "Release v2",
104+
dangerLevel = DangerLevel.HIGH,
105+
)
106+
val digest = inputDigest(payload)
107+
val event = EmissionEvent.BaseProduced(
108+
eventId = "evt-emission-produced",
109+
timestamp = Instant.fromEpochSeconds(1_000),
110+
eventSource = stubEventSourceA,
111+
urgency = Urgency.HIGH,
112+
emission = Emission(
113+
id = "emission-1",
114+
kind = EmissionKind.Confirmation,
115+
payload = payload,
116+
affordances = listOf(
117+
Affordance(
118+
id = "approve",
119+
label = "Approve",
120+
signalPayload = JsonPrimitive("approve"),
121+
),
122+
),
123+
provenance = EmissionProvenance(
124+
runId = "run-emission-1",
125+
workflowId = "workflow-1",
126+
sourceEventId = "source-event-1",
127+
toolInvocationId = "tool-1",
128+
pluginId = "plugin-1",
129+
modelId = "model-1",
130+
inputDigest = digest,
131+
),
132+
dedupKey = digest,
133+
producedAt = Instant.fromEpochSeconds(1_000),
134+
),
135+
)
136+
137+
repo.saveEvent(event).getOrThrow()
138+
139+
val row = database.eventStoreQueries.getEventById("evt-emission-produced").executeAsOne()
140+
assertEquals("run-emission-1", row.run_id)
141+
}
142+
}
143+
88144
@Test
89145
fun `query by type returns only matching`() {
90146
runBlocking {

ampere-core/src/jvmTest/kotlin/link/socket/ampere/agents/events/messages/AgentMessageApiTest.kt

Lines changed: 107 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import kotlin.test.assertEquals
88
import kotlin.test.assertNotNull
99
import kotlin.test.assertTrue
1010
import kotlinx.coroutines.ExperimentalCoroutinesApi
11+
import kotlinx.coroutines.cancelAndJoin
1112
import kotlinx.coroutines.delay
13+
import kotlinx.coroutines.launch
1214
import kotlinx.coroutines.runBlocking
1315
import kotlinx.coroutines.test.TestScope
1416
import kotlinx.coroutines.test.UnconfinedTestDispatcher
@@ -102,42 +104,50 @@ class AgentMessageApiTest {
102104
assertEquals(2, fetchedThread2.messages.size)
103105

104106
// Escalate -> WAITING_FOR_HUMAN
105-
api.escalateToHuman(
106-
threadId = thread.id,
107-
reason = "Need approval",
108-
)
109-
val fetchedThread3 = api.getThread(thread.id).getOrNull()
110-
assertNotNull(fetchedThread3)
111-
assertEquals(EventStatus.WaitingForHuman, fetchedThread3.status)
112-
113-
// Posting now should fail, since the thread is waiting for human
114-
var threw = false
115-
try {
116-
api.postMessage(
107+
val escalationJob = launch {
108+
api.escalateToHuman(
117109
threadId = thread.id,
118-
content = "Should fail",
110+
reason = "Need approval",
119111
)
120-
} catch (e: IllegalArgumentException) {
121-
threw = true
122-
} catch (e: IllegalStateException) {
123-
threw = true
124112
}
125-
assertTrue(threw)
126-
127-
// Resolve
128-
api.resolveThread(thread.id)
129-
val fetched4 = api.getThread(thread.id).getOrNull()
130-
assertNotNull(fetched4)
131-
assertEquals(EventStatus.Resolved, fetched4.status)
132-
133-
// allow async event handlers to run
134-
delay(200)
135113

136-
// Events were published (at least 1 create, 2 posts, 1 escalation, 2 status changes)
137-
assertTrue(received.any { it is MessageEvent.ThreadCreated })
138-
assertTrue(received.count { it is MessageEvent.MessagePosted } >= 2)
139-
assertTrue(received.any { it is MessageEvent.EscalationRequested })
140-
assertTrue(received.count { it is MessageEvent.ThreadStatusChanged } >= 2)
114+
try {
115+
delay(200)
116+
val fetchedThread3 = api.getThread(thread.id).getOrNull()
117+
assertNotNull(fetchedThread3)
118+
assertEquals(EventStatus.WaitingForHuman, fetchedThread3.status)
119+
120+
// Posting now should fail, since the thread is waiting for human
121+
var threw = false
122+
try {
123+
api.postMessage(
124+
threadId = thread.id,
125+
content = "Should fail",
126+
)
127+
} catch (e: IllegalArgumentException) {
128+
threw = true
129+
} catch (e: IllegalStateException) {
130+
threw = true
131+
}
132+
assertTrue(threw)
133+
134+
// Resolve
135+
api.resolveThread(thread.id)
136+
val fetched4 = api.getThread(thread.id).getOrNull()
137+
assertNotNull(fetched4)
138+
assertEquals(EventStatus.Resolved, fetched4.status)
139+
140+
// allow async event handlers to run
141+
delay(200)
142+
143+
// Events were published (at least 1 create, 2 posts, 1 escalation, 2 status changes)
144+
assertTrue(received.any { it is MessageEvent.ThreadCreated })
145+
assertTrue(received.count { it is MessageEvent.MessagePosted } >= 2)
146+
assertTrue(received.any { it is MessageEvent.EscalationRequested })
147+
assertTrue(received.count { it is MessageEvent.ThreadStatusChanged } >= 2)
148+
} finally {
149+
escalationJob.cancelAndJoin()
150+
}
141151

142152
// ** TODO: Test subscriptions can be unsubscribed from. */
143153
}
@@ -166,48 +176,56 @@ class AgentMessageApiTest {
166176
assertEquals(EventStatus.Open, fetchedThread1.status)
167177

168178
// Escalate to WAITING_FOR_HUMAN
169-
api.escalateToHuman(
170-
threadId = thread.id,
171-
reason = "Need human input",
172-
)
173-
174-
val fetchedThread2 = api.getThread(thread.id).getOrNull()
175-
assertNotNull(fetchedThread2)
176-
assertEquals(EventStatus.WaitingForHuman, fetchedThread2.status)
177-
178-
// Verify posting is blocked
179-
var blocked = false
180-
try {
181-
api.postMessage(thread.id, "Should fail")
182-
} catch (e: IllegalArgumentException) {
183-
blocked = true
179+
val escalationJob = launch {
180+
api.escalateToHuman(
181+
threadId = thread.id,
182+
reason = "Need human input",
183+
)
184184
}
185-
assertTrue(blocked, "Posting should be blocked when waiting for human")
186-
187-
// Reopen the thread
188-
api.reopenThread(thread.id)
189-
190-
val fetchedThread3 = api.getThread(thread.id).getOrNull()
191-
assertNotNull(fetchedThread3)
192-
assertEquals(EventStatus.Open, fetchedThread3.status)
193-
194-
// Now posting should succeed
195-
val newMessage = api.postMessage(thread.id, "Human intervention complete")
196-
assertEquals("Human intervention complete", newMessage.content)
197185

198-
// Verify thread has the new message
199-
val fetchedThread4 = api.getThread(thread.id).getOrNull()
200-
assertNotNull(fetchedThread4)
201-
assertEquals(2, fetchedThread4.messages.size)
202-
203-
// Allow async handlers to run
204-
delay(200)
186+
try {
187+
delay(200)
188+
189+
val fetchedThread2 = api.getThread(thread.id).getOrNull()
190+
assertNotNull(fetchedThread2)
191+
assertEquals(EventStatus.WaitingForHuman, fetchedThread2.status)
192+
193+
// Verify posting is blocked
194+
var blocked = false
195+
try {
196+
api.postMessage(thread.id, "Should fail")
197+
} catch (e: IllegalArgumentException) {
198+
blocked = true
199+
}
200+
assertTrue(blocked, "Posting should be blocked when waiting for human")
201+
202+
// Reopen the thread
203+
api.reopenThread(thread.id)
205204

206-
// Verify status change events
207-
val statusChanges = received.filterIsInstance<MessageEvent.ThreadStatusChanged>()
208-
assertTrue(statusChanges.size >= 2)
209-
assertTrue(statusChanges.any { it.newStatus == EventStatus.WaitingForHuman })
210-
assertTrue(statusChanges.any { it.newStatus == EventStatus.Open })
205+
val fetchedThread3 = api.getThread(thread.id).getOrNull()
206+
assertNotNull(fetchedThread3)
207+
assertEquals(EventStatus.Open, fetchedThread3.status)
208+
209+
// Now posting should succeed
210+
val newMessage = api.postMessage(thread.id, "Human intervention complete")
211+
assertEquals("Human intervention complete", newMessage.content)
212+
213+
// Verify thread has the new message
214+
val fetchedThread4 = api.getThread(thread.id).getOrNull()
215+
assertNotNull(fetchedThread4)
216+
assertEquals(2, fetchedThread4.messages.size)
217+
218+
// Allow async handlers to run
219+
delay(200)
220+
221+
// Verify status change events
222+
val statusChanges = received.filterIsInstance<MessageEvent.ThreadStatusChanged>()
223+
assertTrue(statusChanges.size >= 2)
224+
assertTrue(statusChanges.any { it.newStatus == EventStatus.WaitingForHuman })
225+
assertTrue(statusChanges.any { it.newStatus == EventStatus.Open })
226+
} finally {
227+
escalationJob.cancelAndJoin()
228+
}
211229
}
212230
}
213231

@@ -227,19 +245,25 @@ class AgentMessageApiTest {
227245
initialMessageContent = "Need a decision",
228246
)
229247

230-
api.escalateToHuman(
231-
threadId = thread.id,
232-
reason = "Need human input on release timing",
233-
context = mapOf("release" to "v1"),
234-
)
235-
236-
delay(200)
248+
val escalationJob = launch {
249+
api.escalateToHuman(
250+
threadId = thread.id,
251+
reason = "Need human input on release timing",
252+
context = mapOf("release" to "v1"),
253+
)
254+
}
237255

238-
assertEquals(1, received.size)
239-
val event = received.single()
240-
assertEquals(thread.id, event.threadId)
241-
assertEquals("Need human input on release timing", event.reason)
242-
assertEquals(mapOf("release" to "v1"), event.context)
256+
try {
257+
delay(200)
258+
259+
assertEquals(1, received.size)
260+
val event = received.single()
261+
assertEquals(thread.id, event.threadId)
262+
assertEquals("Need human input on release timing", event.reason)
263+
assertEquals(mapOf("release" to "v1"), event.context)
264+
} finally {
265+
escalationJob.cancelAndJoin()
266+
}
243267
}
244268
}
245269

0 commit comments

Comments
 (0)