Skip to content

Commit bf87bc4

Browse files
committed
test: tighten rust a2a coverage checks
Signed-off-by: lucarlig <luca.carlig@ibm.com>
1 parent 3837de3 commit bf87bc4

File tree

4 files changed

+230
-5
lines changed

4 files changed

+230
-5
lines changed

tools_rust/a2a_runtime/src/event_store.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,79 @@ impl EventStore {
335335
let meta_key = format!("{KEY_PREFIX}:{task_id}:meta");
336336
let _ = self.storage.hset(&meta_key, "stream_active", "0").await;
337337
}
338+
339+
#[doc(hidden)]
340+
pub fn seeded_for_test(events: Vec<StoredEvent>, stream_active: bool) -> Self {
341+
#[derive(Clone)]
342+
struct SeededEventStoreStorage {
343+
events: Arc<Vec<StoredEvent>>,
344+
active: bool,
345+
}
346+
347+
#[async_trait]
348+
impl EventStoreStorage for SeededEventStoreStorage {
349+
async fn store_event(
350+
&self,
351+
_meta_key: &str,
352+
_events_key: &str,
353+
_messages_key: &str,
354+
_event_id: &str,
355+
_payload_json: &str,
356+
_ttl_secs: u64,
357+
_max_events: usize,
358+
) -> Result<i64, String> {
359+
Err("store not supported in seeded test store".to_string())
360+
}
361+
362+
async fn replay_entries(
363+
&self,
364+
_events_key: &str,
365+
after_sequence: i64,
366+
) -> Result<Vec<(String, f64)>, String> {
367+
Ok(self
368+
.events
369+
.iter()
370+
.filter(|event| event.sequence > after_sequence)
371+
.map(|event| (event.event_id.clone(), event.sequence as f64))
372+
.collect())
373+
}
374+
375+
async fn payloads(
376+
&self,
377+
_messages_key: &str,
378+
event_ids: &[String],
379+
) -> Result<Vec<Option<String>>, String> {
380+
Ok(event_ids
381+
.iter()
382+
.map(|event_id| {
383+
self.events
384+
.iter()
385+
.find(|event| &event.event_id == event_id)
386+
.map(|event| event.payload.clone())
387+
})
388+
.collect())
389+
}
390+
391+
async fn hget(&self, _key: &str, _field: &str) -> Result<Option<String>, String> {
392+
Ok(Some(if self.active { "1" } else { "0" }.to_string()))
393+
}
394+
395+
async fn hset(&self, _key: &str, _field: &str, _value: &str) -> Result<(), String> {
396+
Ok(())
397+
}
398+
}
399+
400+
let (flush_tx, _flush_rx) = mpsc::channel(1);
401+
Self::new_with_storage(
402+
Arc::new(SeededEventStoreStorage {
403+
events: Arc::new(events),
404+
active: stream_active,
405+
}),
406+
256,
407+
60,
408+
flush_tx,
409+
)
410+
}
338411
}
339412

340413
// ---------------------------------------------------------------------------

tools_rust/a2a_runtime/src/lib.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,14 @@ pub mod test_support {
211211

212212
/// Build the Axum app without starting a listener.
213213
pub fn build_app(config: RuntimeConfig) -> axum::Router {
214+
build_app_with_event_store(config, None)
215+
}
216+
217+
/// Build the Axum app with a test-supplied event store.
218+
pub fn build_app_with_event_store(
219+
config: RuntimeConfig,
220+
event_store: Option<Arc<crate::event_store::EventStore>>,
221+
) -> axum::Router {
214222
let client = build_http_client(&config).expect("failed to build reqwest client");
215223
let config_arc = Arc::new(config);
216224

@@ -247,7 +255,7 @@ pub mod test_support {
247255
redis_pool: None,
248256
agent_cache,
249257
session_manager: None,
250-
event_store: None,
258+
event_store,
251259
};
252260

253261
server::router(state)

tools_rust/a2a_runtime/src/server.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ async fn proxy_to_backend(
11121112
.client
11131113
.request(reqwest_method, &url)
11141114
.headers(forwarded)
1115-
.body(body.to_vec())
1115+
.body(body)
11161116
.send()
11171117
.await
11181118
.map_err(|e| {
@@ -1484,4 +1484,51 @@ mod tests {
14841484
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
14851485
assert_eq!(&body[..], b"backend-body");
14861486
}
1487+
1488+
#[tokio::test]
1489+
async fn handle_streaming_method_replays_from_store_when_last_event_id_is_present() {
1490+
let mut state = test_state("http://127.0.0.1:1".to_string());
1491+
state.event_store = Some(Arc::new(crate::event_store::EventStore::seeded_for_test(
1492+
vec![
1493+
crate::event_store::StoredEvent {
1494+
event_id: "evt-1".to_string(),
1495+
sequence: 1,
1496+
event_type: "unknown".to_string(),
1497+
payload: r#"{"status":"queued"}"#.to_string(),
1498+
},
1499+
crate::event_store::StoredEvent {
1500+
event_id: "evt-2".to_string(),
1501+
sequence: 2,
1502+
event_type: "status".to_string(),
1503+
payload: r#"{"status":"working"}"#.to_string(),
1504+
},
1505+
],
1506+
false,
1507+
)));
1508+
1509+
let response = handle_streaming_method(
1510+
&state,
1511+
"test-agent",
1512+
&json!({"params": {"id": "task-123"}}),
1513+
&HashMap::from([("last-event-id".to_string(), "task-123:0".to_string())]),
1514+
&json!({"sub": "user"}),
1515+
)
1516+
.await
1517+
.unwrap();
1518+
1519+
assert_eq!(response.status(), StatusCode::OK);
1520+
assert_eq!(
1521+
response
1522+
.headers()
1523+
.get("content-type")
1524+
.and_then(|v| v.to_str().ok()),
1525+
Some("text/event-stream")
1526+
);
1527+
1528+
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
1529+
let text = String::from_utf8_lossy(&body);
1530+
assert!(text.contains("id: evt-1:1"), "expected first replayed id, body: {text}");
1531+
assert!(text.contains("data: {\"status\":\"queued\"}"), "expected first payload, body: {text}");
1532+
assert!(text.contains("id: evt-2:2"), "expected second replayed id, body: {text}");
1533+
}
14871534
}

tools_rust/a2a_runtime/tests/integration.rs

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ use axum::http::{Request, StatusCode};
1515
use contextforge_a2a_runtime::config::RuntimeConfig;
1616
use http_body_util::BodyExt;
1717
use serde_json::{Value, json};
18+
use std::sync::Arc;
1819
use tower::ServiceExt;
19-
use wiremock::matchers::{method, path};
20+
use wiremock::matchers::{body_json, method, path};
2021
use wiremock::{Mock, MockServer, ResponseTemplate};
2122
use test_helpers::*;
2223

@@ -1652,6 +1653,12 @@ async fn test_a2a_invoke_create_push_config_routes_to_python() {
16521653

16531654
Mock::given(method("POST"))
16541655
.and(path_regex(".*push/create$"))
1656+
.and(body_json(json!({
1657+
"a2a_agent_id": "agent-001",
1658+
"task_id": "task-1",
1659+
"webhook_url": "https://example.com/hook",
1660+
"enabled": true
1661+
})))
16551662
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
16561663
"config_id": "cfg-1",
16571664
"enabled": true
@@ -1672,7 +1679,12 @@ async fn test_a2a_invoke_create_push_config_routes_to_python() {
16721679
"jsonrpc": "2.0",
16731680
"method": "CreateTaskPushNotificationConfig",
16741681
"id": 8,
1675-
"params": {"task_id": "task-1", "webhook_url": "https://example.com/hook", "enabled": true}
1682+
"params": {
1683+
"a2a_agent_id": "agent-001",
1684+
"task_id": "task-1",
1685+
"webhook_url": "https://example.com/hook",
1686+
"enabled": true
1687+
}
16761688
}),
16771689
)
16781690
.await;
@@ -1706,6 +1718,9 @@ async fn test_a2a_invoke_get_push_config_routes_to_python() {
17061718

17071719
Mock::given(method("POST"))
17081720
.and(path_regex(".*push/get$"))
1721+
.and(body_json(json!({
1722+
"task_id": "task-1"
1723+
})))
17091724
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
17101725
"config_id": "cfg-1",
17111726
"enabled": true
@@ -1726,7 +1741,7 @@ async fn test_a2a_invoke_get_push_config_routes_to_python() {
17261741
"jsonrpc": "2.0",
17271742
"method": "GetTaskPushNotificationConfig",
17281743
"id": 9,
1729-
"params": {"config_id": "cfg-1"}
1744+
"params": {"task_id": "task-1"}
17301745
}),
17311746
)
17321747
.await;
@@ -1760,6 +1775,9 @@ async fn test_a2a_invoke_list_push_configs_routes_to_python() {
17601775

17611776
Mock::given(method("POST"))
17621777
.and(path_regex(".*push/list$"))
1778+
.and(body_json(json!({
1779+
"task_id": "task-1"
1780+
})))
17631781
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
17641782
"configs": [{"config_id": "cfg-1"}, {"config_id": "cfg-2"}]
17651783
})))
@@ -1813,6 +1831,9 @@ async fn test_a2a_invoke_delete_push_config_routes_to_python() {
18131831

18141832
Mock::given(method("POST"))
18151833
.and(path_regex(".*push/delete$"))
1834+
.and(body_json(json!({
1835+
"config_id": "cfg-1"
1836+
})))
18161837
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
18171838
"deleted": true
18181839
})))
@@ -2309,6 +2330,13 @@ async fn test_streaming_method_forwards_sse_stream() {
23092330

23102331
let response = app.oneshot(request).await.unwrap();
23112332
assert_eq!(response.status(), StatusCode::OK);
2333+
assert_eq!(
2334+
response
2335+
.headers()
2336+
.get("content-type")
2337+
.and_then(|v| v.to_str().ok()),
2338+
Some("text/event-stream")
2339+
);
23122340

23132341
let body = String::from_utf8_lossy(
23142342
&response.into_body().collect().await.unwrap().to_bytes(),
@@ -2975,3 +3003,72 @@ async fn test_streaming_method_with_last_event_id_without_redis() {
29753003

29763004
mock_server.verify().await;
29773005
}
3006+
3007+
#[tokio::test]
3008+
async fn test_streaming_method_replays_from_store_when_last_event_id_present() {
3009+
let mock_server = MockServer::start().await;
3010+
setup_auth_mock(&mock_server, 1).await;
3011+
setup_authz_mock(&mock_server, 1).await;
3012+
3013+
let mut config = default_test_config();
3014+
config.backend_base_url = mock_server.uri();
3015+
config.auth_secret = Some("test-secret".to_string());
3016+
let event_store = Arc::new(contextforge_a2a_runtime::event_store::EventStore::seeded_for_test(
3017+
vec![
3018+
contextforge_a2a_runtime::event_store::StoredEvent {
3019+
event_id: "evt-1".to_string(),
3020+
sequence: 1,
3021+
event_type: "unknown".to_string(),
3022+
payload: r#"{"status":"queued"}"#.to_string(),
3023+
},
3024+
contextforge_a2a_runtime::event_store::StoredEvent {
3025+
event_id: "evt-2".to_string(),
3026+
sequence: 2,
3027+
event_type: "unknown".to_string(),
3028+
payload: r#"{"status":"working"}"#.to_string(),
3029+
},
3030+
],
3031+
false,
3032+
));
3033+
let app = contextforge_a2a_runtime::test_support::build_app_with_event_store(
3034+
config,
3035+
Some(event_store),
3036+
);
3037+
3038+
let request = Request::builder()
3039+
.method("POST")
3040+
.uri("/a2a/test-agent/invoke")
3041+
.header("content-type", "application/json")
3042+
.header("last-event-id", "task-123:0")
3043+
.body(Body::from(
3044+
serde_json::to_vec(&json!({
3045+
"jsonrpc": "2.0",
3046+
"method": "SendStreamingMessage",
3047+
"id": 1,
3048+
"params": {"id": "task-123", "message": {"role": "ROLE_USER", "parts": [{"text": "hello"}]}}
3049+
}))
3050+
.unwrap(),
3051+
))
3052+
.unwrap();
3053+
3054+
let response = app.oneshot(request).await.unwrap();
3055+
assert_eq!(response.status(), StatusCode::OK);
3056+
assert_eq!(
3057+
response
3058+
.headers()
3059+
.get("content-type")
3060+
.and_then(|v| v.to_str().ok()),
3061+
Some("text/event-stream")
3062+
);
3063+
3064+
let body = String::from_utf8_lossy(
3065+
&response.into_body().collect().await.unwrap().to_bytes(),
3066+
)
3067+
.to_string();
3068+
assert!(body.contains("id: evt-1:1"), "expected first replayed event, body: {body}");
3069+
assert!(body.contains("id: evt-2:2"), "expected second replayed event, body: {body}");
3070+
assert!(body.contains("data: {\"status\":\"queued\"}"), "expected queued payload, body: {body}");
3071+
assert!(body.contains("data: {\"status\":\"working\"}"), "expected working payload, body: {body}");
3072+
3073+
mock_server.verify().await;
3074+
}

0 commit comments

Comments
 (0)