Skip to content

Commit 4f94911

Browse files
qqeasonchenxwm1992claude
authored
[ISSUE #5247] Add A2A Agent Card Registry to EventMesh (#5246)
* update architecture * update architecture image * Test: Add unit test for SendAsyncEventProcessor to verify V1 and V2 logic integration * Test & Docs: Add unit tests for core engines and documentation for plugin configuration * 1.12.0-prepare (#5222) * Update copyright year to 2025 * update release version * feat: add A2A Agent Card Registry based on EMQX reference implementation - Add Agent Card Java model classes matching A2A spec (AgentCard, AgentInterface, AgentProvider, AgentCapabilities, AgentSkill, SecurityScheme, etc.) - Add Agent Card JSON Schema from EMQX for validation - Add AgentCardValidator with JSON Schema validation support - Add AgentIdentity with hierarchical ID (org_id/unit_id/agent_id) and discovery topic construction/parsing - Update A2AProtocolConstants with Agent Card operations, status constants, CE extension keys, and ID validation pattern - Update EnhancedA2AProtocolAdaptor for Agent Card operation routing and discovery topic support - Implement A2APublishSubscribeService with full Card Registry (CRUD, status tracking, event metadata augmentation) - Add A2ACardHttpHandler REST API for card management - Add AgentCardDemo example - Add json-schema-validator and protocol-a2a dependencies * fix: remove test files that reference refactoring code moved to separate PR Remove RouterEngineTest.java and SendAsyncEventProcessorTest.java which test RouterEngine/IngressProcessor pipeline code that was moved to the refactor/unified-runtime-pipeline branch. These files were added as part of the refactoring commits that have been separated into their own PR. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: resolve checkstyle violations blocking CI - Remove unused import java.nio.charset.StandardCharsets in A2APublishSubscribeService - Use try-with-resources for EventMeshTCPClient in example files to fix resource leak checkstyle warnings (AsyncPublish, SyncRequest, cloudevents AsyncPublish) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: resolve checkstyle import ordering and PMD violations - Remove unused java.util.Collections import and reorder imports to match checkstyle groups (org.apache.eventmesh, java, io, com, lombok) in A2APublishSubscribeService - Reorder java.* imports before io.* in A2ACardHttpHandler to match checkstyle ImportOrder rule - Inline EventMeshTCPClientConfig.builder() into factory calls in example files to avoid PMD DU anomaly warnings from standalone config variables Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: remove redundant and unused imports in AgentCardDemo - Remove redundant import for A2AAbstractDemo (same package) - Remove unused import for AgentIdentity Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: reorder imports in AgentCardValidator to match checkstyle ImportOrder Move java.* imports before com.* and lombok.* to comply with the project's import group ordering (org.apache.eventmesh, java, com, lombok). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: update FilterEngineTest and TransformerEngineTest for new constructors FilterEngine and TransformerEngine now require ProducerManager and ConsumerManager in addition to MetaStorage. Update test files to mock these additional dependencies. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: mike_xwm <mike_xwm@126.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 1e66c64 commit 4f94911

26 files changed

Lines changed: 3009 additions & 125 deletions

File tree

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# EventMesh Core Engines Configuration Guide
2+
3+
EventMesh provides powerful core engines (`Filter`, `Transformer`, `Router`) to dynamically process messages. These engines are configured via **MetaStorage** (Governance Center, e.g., Nacos, Etcd), supporting on-demand loading and hot-reloading.
4+
5+
## 0. Core Concepts
6+
7+
Before configuration, it is important to understand the specific role of each engine in the message flow:
8+
9+
* **Filter (The Gatekeeper)**: Decides **"Whether to pass"**.
10+
* It inspects the message (CloudEvent) attributes. If the message matches the rules, it passes; otherwise, it is dropped.
11+
* *Use Case*: Block debug logs from production traffic; Only subscribe to specific event types.
12+
13+
* **Transformer (The Translator)**: Decides **"What it looks like"**.
14+
* It modifies the message content (Payload or Metadata) according to templates or scripts.
15+
* *Use Case*: Convert XML to JSON; Mask sensitive data (PII); Adapt legacy protocols to new standards.
16+
17+
* **Router (The Dispatcher)**: Decides **"Where to go"**.
18+
* It dynamically changes the destination (Topic) of the message.
19+
* *Use Case*: Route traffic to a Canary/Gray release topic; Route high-priority orders to a dedicated queue.
20+
21+
---
22+
23+
## 1. Overview
24+
25+
The configuration is not in local property files but distributed via the MetaStorage. EventMesh listens to specific **Keys** based on client Groups.
26+
27+
- **Data Source**: Configured via `eventMesh.metaStorage.plugin.type`.
28+
- **Loading Mechanism**: Lazy loading & Hot-reloading.
29+
- **Key Format**: `{EnginePrefix}-{GroupName}`.
30+
- **Value Format**: JSON Array.
31+
32+
| Engine | Prefix | Scope | Description |
33+
| :--- | :--- | :--- | :--- |
34+
| **Router** | `router-` | Pub Only | Routes messages to different topics. |
35+
| **Filter** | `filter-` | Pub & Sub | Filters messages based on CloudEvent attributes. |
36+
| **Transformer** | `transformer-` | Pub & Sub | Transforms message content (Payload/Header). |
37+
38+
---
39+
40+
## 2. Router (Routing)
41+
42+
**Scope**: Publish Only (Upstream)
43+
**Key**: `router-{producerGroup}`
44+
45+
Decides the target storage topic for a message sent by a producer.
46+
47+
### Configuration Example (JSON)
48+
49+
```json
50+
[
51+
{
52+
"topic": "original-topic",
53+
"routerConfig": {
54+
"targetTopic": "redirect-topic",
55+
"expression": "data.type == 'urgent'"
56+
}
57+
}
58+
]
59+
```
60+
61+
* **topic**: The original topic the producer sends to.
62+
* **targetTopic**: The actual topic to write to Storage.
63+
* **expression**: Condition to trigger routing (e.g., SpEL).
64+
65+
---
66+
67+
## 3. Filter (Filtering)
68+
69+
**Scope**: Both Publish (Upstream) & Subscribe (Downstream)
70+
71+
### A. Publish Side (Upstream)
72+
**Key**: `filter-{producerGroup}`
73+
**Effect**: Intercepts messages **before** they are sent to Storage.
74+
75+
### B. Subscribe Side (Downstream)
76+
**Key**: `filter-{consumerGroup}`
77+
**Effect**: Intercepts messages **before** they are pushed to the Consumer.
78+
79+
### Configuration Example (JSON)
80+
81+
```json
82+
[
83+
{
84+
"topic": "test-topic",
85+
"filterPattern": {
86+
"source": ["app-a", "app-b"],
87+
"type": [{"prefix": "com.example"}]
88+
}
89+
}
90+
]
91+
```
92+
93+
* **filterPattern**: Rules matching CloudEvent attributes. If a message doesn't match, it is dropped.
94+
95+
---
96+
97+
## 4. Transformer (Transformation)
98+
99+
**Scope**: Both Publish (Upstream) & Subscribe (Downstream)
100+
101+
### A. Publish Side (Upstream)
102+
**Key**: `transformer-{producerGroup}`
103+
**Effect**: Modifies message content **before** sending to Storage.
104+
105+
### B. Subscribe Side (Downstream)
106+
**Key**: `transformer-{consumerGroup}`
107+
**Effect**: Modifies message content **before** pushing to the Consumer.
108+
109+
### Configuration Example (JSON)
110+
111+
```json
112+
[
113+
{
114+
"topic": "raw-topic",
115+
"transformerConfig": {
116+
"transformerType": "template",
117+
"template": "{\"id\": \"${id}\", \"new_content\": \"${data.content}\"}"
118+
}
119+
}
120+
]
121+
```
122+
123+
* **transformerType**: e.g., `original`, `template`.
124+
* **template**: The transformation template definition.
125+
126+
---
127+
128+
## 5. Verification
129+
130+
1. **Publish Config**: Add the JSON config to your Governance Center (e.g., Nacos) with the Data ID `router-MyGroup`.
131+
2. **Send Message**: Use EventMesh SDK to send a message from `MyGroup`.
132+
3. **Observe**:
133+
* For **Router**: Check if the message appears in the `targetTopic` in your MQ.
134+
* For **Filter**: Check if blocked messages are skipped.
135+
* For **Transformer**: Check if the message body in MQ (for Pub) or Consumer (for Sub) is modified.

eventmesh-examples/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121
implementation project(":eventmesh-sdks:eventmesh-sdk-java")
2222
implementation project(":eventmesh-common")
2323
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
24+
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-a2a")
2425
implementation project(":eventmesh-connectors:eventmesh-connector-spring")
2526
implementation('org.springframework.boot:spring-boot-starter-web') {
2627
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.a2a.demo;
19+
20+
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
21+
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
22+
import org.apache.eventmesh.common.Constants;
23+
import org.apache.eventmesh.common.ExampleConstants;
24+
import org.apache.eventmesh.common.utils.JsonUtils;
25+
import org.apache.eventmesh.protocol.a2a.A2AProtocolConstants;
26+
import org.apache.eventmesh.protocol.a2a.model.AgentCapabilities;
27+
import org.apache.eventmesh.protocol.a2a.model.AgentCard;
28+
import org.apache.eventmesh.protocol.a2a.model.AgentInterface;
29+
import org.apache.eventmesh.protocol.a2a.model.AgentProvider;
30+
import org.apache.eventmesh.protocol.a2a.model.AgentSkill;
31+
32+
import java.net.URI;
33+
import java.nio.charset.StandardCharsets;
34+
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
import java.util.UUID;
39+
40+
import io.cloudevents.CloudEvent;
41+
import io.cloudevents.core.builder.CloudEventBuilder;
42+
43+
import lombok.extern.slf4j.Slf4j;
44+
45+
/**
46+
* Demo showing A2A Agent Card registration, discovery, and deletion via EventMesh.
47+
*/
48+
@Slf4j
49+
public class AgentCardDemo extends A2AAbstractDemo {
50+
51+
public static void main(String[] args) throws Exception {
52+
EventMeshHttpClientConfig config = initEventMeshHttpClientConfig("a2a-agent-card-demo");
53+
try (EventMeshHttpProducer producer = new EventMeshHttpProducer(config)) {
54+
55+
// 1. Register an Agent Card
56+
registerAgentCard(producer, "my.org", "my.unit", "weather-agent");
57+
58+
// 2. List Agent Cards
59+
listAgentCards(producer);
60+
61+
// 3. Get specific Agent Card
62+
getAgentCard(producer, "my.org", "my.unit", "weather-agent");
63+
64+
// 4. Delete Agent Card
65+
deleteAgentCard(producer, "my.org", "my.unit", "weather-agent");
66+
67+
log.info("AgentCardDemo completed.");
68+
}
69+
}
70+
71+
private static void registerAgentCard(EventMeshHttpProducer producer,
72+
String orgId, String unitId, String agentId) throws Exception {
73+
AgentCard card = buildSampleCard(agentId);
74+
75+
Map<String, Object> params = new HashMap<>();
76+
params.put("org_id", orgId);
77+
params.put("unit_id", unitId);
78+
params.put("agent_id", agentId);
79+
params.put("card", card);
80+
81+
Map<String, Object> request = new HashMap<>();
82+
request.put("jsonrpc", "2.0");
83+
request.put("method", A2AProtocolConstants.OP_REGISTER_AGENT_CARD);
84+
request.put("params", params);
85+
request.put("id", UUID.randomUUID().toString());
86+
87+
CloudEvent event = buildA2ACardEvent(request);
88+
producer.publish(event);
89+
log.info("Registered agent card: {}/{}/{}", orgId, unitId, agentId);
90+
}
91+
92+
private static void listAgentCards(EventMeshHttpProducer producer) throws Exception {
93+
Map<String, Object> params = new HashMap<>();
94+
params.put("org_id", "my.org");
95+
96+
Map<String, Object> request = new HashMap<>();
97+
request.put("jsonrpc", "2.0");
98+
request.put("method", A2AProtocolConstants.OP_LIST_AGENT_CARDS);
99+
request.put("params", params);
100+
request.put("id", UUID.randomUUID().toString());
101+
102+
CloudEvent event = buildA2ACardEvent(request);
103+
producer.publish(event);
104+
log.info("Listed agent cards for org: my.org");
105+
}
106+
107+
private static void getAgentCard(EventMeshHttpProducer producer,
108+
String orgId, String unitId, String agentId) throws Exception {
109+
Map<String, Object> params = new HashMap<>();
110+
params.put("org_id", orgId);
111+
params.put("unit_id", unitId);
112+
params.put("agent_id", agentId);
113+
114+
Map<String, Object> request = new HashMap<>();
115+
request.put("jsonrpc", "2.0");
116+
request.put("method", A2AProtocolConstants.OP_GET_AGENT_CARD);
117+
request.put("params", params);
118+
request.put("id", UUID.randomUUID().toString());
119+
120+
CloudEvent event = buildA2ACardEvent(request);
121+
producer.publish(event);
122+
log.info("Got agent card: {}/{}/{}", orgId, unitId, agentId);
123+
}
124+
125+
private static void deleteAgentCard(EventMeshHttpProducer producer,
126+
String orgId, String unitId, String agentId) throws Exception {
127+
Map<String, Object> params = new HashMap<>();
128+
params.put("org_id", orgId);
129+
params.put("unit_id", unitId);
130+
params.put("agent_id", agentId);
131+
132+
Map<String, Object> request = new HashMap<>();
133+
request.put("jsonrpc", "2.0");
134+
request.put("method", A2AProtocolConstants.OP_DELETE_AGENT_CARD);
135+
request.put("params", params);
136+
request.put("id", UUID.randomUUID().toString());
137+
138+
CloudEvent event = buildA2ACardEvent(request);
139+
producer.publish(event);
140+
log.info("Deleted agent card: {}/{}/{}", orgId, unitId, agentId);
141+
}
142+
143+
private static AgentCard buildSampleCard(String agentId) {
144+
AgentInterface iface = AgentInterface.builder()
145+
.url("http://localhost:8080/a2a")
146+
.protocolBinding("JSONRPC")
147+
.protocolVersion(A2AProtocolConstants.PROTOCOL_VERSION)
148+
.build();
149+
150+
AgentProvider provider = AgentProvider.builder()
151+
.url("https://example.org")
152+
.organization("Example Org")
153+
.build();
154+
155+
AgentCapabilities capabilities = AgentCapabilities.builder()
156+
.streaming(true)
157+
.pushNotifications(true)
158+
.build();
159+
160+
AgentSkill skill = AgentSkill.builder()
161+
.id("weather-query")
162+
.name("Weather Query")
163+
.description("Queries weather information for a given location")
164+
.tags(Arrays.asList("weather", "query"))
165+
.examples(Arrays.asList("What's the weather in Beijing?"))
166+
.build();
167+
168+
return AgentCard.builder()
169+
.name(agentId)
170+
.description("A weather query agent")
171+
.version("1.0.0")
172+
.supportedInterfaces(Collections.singletonList(iface))
173+
.provider(provider)
174+
.capabilities(capabilities)
175+
.skills(Collections.singletonList(skill))
176+
.defaultInputModes(Collections.singletonList("text/plain"))
177+
.defaultOutputModes(Collections.singletonList("text/plain"))
178+
.build();
179+
}
180+
181+
private static CloudEvent buildA2ACardEvent(Map<String, Object> jsonRpcBody) {
182+
String content = JsonUtils.toJSONString(jsonRpcBody);
183+
String method = (String) jsonRpcBody.get("method");
184+
String ceType = A2AProtocolConstants.CE_TYPE_PREFIX + method.replace("/", ".") + ".req";
185+
186+
return CloudEventBuilder.v1()
187+
.withId(UUID.randomUUID().toString())
188+
.withSource(URI.create("a2a-agent-card-demo"))
189+
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
190+
.withType(ceType)
191+
.withData(content.getBytes(StandardCharsets.UTF_8))
192+
.withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL, "A2A")
193+
.withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL_VERSION, "2.0")
194+
.withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_METHOD, method)
195+
.withExtension(A2AProtocolConstants.CE_EXTENSION_MCP_TYPE, "request")
196+
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4_000))
197+
.build();
198+
}
199+
}

eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,26 @@ public static void main(String[] args) throws Exception {
4242
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
4343
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty(ExampleConstants.EVENTMESH_TCP_PORT));
4444
try {
45-
UserAgent userAgent = EventMeshTestUtils.generateClient1();
46-
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
47-
.host(eventMeshIp)
48-
.port(eventMeshTcpPort)
49-
.userAgent(userAgent)
50-
.build();
51-
final EventMeshTCPClient<CloudEvent> client =
52-
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
53-
client.init();
45+
final UserAgent userAgent = EventMeshTestUtils.generateClient1();
46+
try (final EventMeshTCPClient<CloudEvent> client =
47+
EventMeshTCPClientFactory.createEventMeshTCPClient(
48+
EventMeshTCPClientConfig.builder()
49+
.host(eventMeshIp)
50+
.port(eventMeshTcpPort)
51+
.userAgent(userAgent)
52+
.build(),
53+
CloudEvent.class)) {
54+
client.init();
5455

55-
for (int i = 0; i < 2; i++) {
56-
CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async();
57-
log.info("begin send async msg[{}]: {}", i, event);
58-
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
56+
for (int i = 0; i < 2; i++) {
57+
final CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async();
58+
log.info("begin send async msg[{}]: {}", i, event);
59+
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
5960

60-
ThreadUtils.sleep(1, TimeUnit.SECONDS);
61+
ThreadUtils.sleep(1, TimeUnit.SECONDS);
62+
}
63+
ThreadUtils.sleep(2, TimeUnit.SECONDS);
6164
}
62-
ThreadUtils.sleep(2, TimeUnit.SECONDS);
6365
} catch (Exception e) {
6466
log.error("AsyncPublish failed", e);
6567
}

0 commit comments

Comments
 (0)