Skip to content

Commit 469ea79

Browse files
author
easoncchen(陈广胜)
committed
Merge upstream/master into refactor/unified-runtime-pipeline
Resolve conflicts: - docs/plugins/core-engines-configuration.md: Keep PR changes (Pipeline Key feature) - A2APublishSubscribeService.java: Keep upstream (Agent Card Registry) - FilterEngineTest.java: Keep upstream (ProducerManager/ConsumerManager) - TransformerEngineTest.java: Keep upstream (ProducerManager/ConsumerManager)
2 parents c49a41f + 4f94911 commit 469ea79

24 files changed

Lines changed: 2644 additions & 135 deletions

File tree

eventmesh-examples/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121
implementation project(":eventmesh-sdks:eventmesh-sdk-java")
2222
implementation project(":eventmesh-common")
2323
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
24+
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-a2a")
2425
implementation project(":eventmesh-connectors:eventmesh-connector-spring")
2526
implementation('org.springframework.boot:spring-boot-starter-web') {
2627
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.a2a.demo;
19+
20+
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
21+
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
22+
import org.apache.eventmesh.common.Constants;
23+
import org.apache.eventmesh.common.ExampleConstants;
24+
import org.apache.eventmesh.common.utils.JsonUtils;
25+
import org.apache.eventmesh.protocol.a2a.A2AProtocolConstants;
26+
import org.apache.eventmesh.protocol.a2a.model.AgentCapabilities;
27+
import org.apache.eventmesh.protocol.a2a.model.AgentCard;
28+
import org.apache.eventmesh.protocol.a2a.model.AgentInterface;
29+
import org.apache.eventmesh.protocol.a2a.model.AgentProvider;
30+
import org.apache.eventmesh.protocol.a2a.model.AgentSkill;
31+
32+
import java.net.URI;
33+
import java.nio.charset.StandardCharsets;
34+
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
import java.util.UUID;
39+
40+
import io.cloudevents.CloudEvent;
41+
import io.cloudevents.core.builder.CloudEventBuilder;
42+
43+
import lombok.extern.slf4j.Slf4j;
44+
45+
/**
46+
* Demo showing A2A Agent Card registration, discovery, and deletion via EventMesh.
47+
*/
48+
@Slf4j
49+
public class AgentCardDemo extends A2AAbstractDemo {
50+
51+
public static void main(String[] args) throws Exception {
52+
EventMeshHttpClientConfig config = initEventMeshHttpClientConfig("a2a-agent-card-demo");
53+
try (EventMeshHttpProducer producer = new EventMeshHttpProducer(config)) {
54+
55+
// 1. Register an Agent Card
56+
registerAgentCard(producer, "my.org", "my.unit", "weather-agent");
57+
58+
// 2. List Agent Cards
59+
listAgentCards(producer);
60+
61+
// 3. Get specific Agent Card
62+
getAgentCard(producer, "my.org", "my.unit", "weather-agent");
63+
64+
// 4. Delete Agent Card
65+
deleteAgentCard(producer, "my.org", "my.unit", "weather-agent");
66+
67+
log.info("AgentCardDemo completed.");
68+
}
69+
}
70+
71+
private static void registerAgentCard(EventMeshHttpProducer producer,
72+
String orgId, String unitId, String agentId) throws Exception {
73+
AgentCard card = buildSampleCard(agentId);
74+
75+
Map<String, Object> params = new HashMap<>();
76+
params.put("org_id", orgId);
77+
params.put("unit_id", unitId);
78+
params.put("agent_id", agentId);
79+
params.put("card", card);
80+
81+
Map<String, Object> request = new HashMap<>();
82+
request.put("jsonrpc", "2.0");
83+
request.put("method", A2AProtocolConstants.OP_REGISTER_AGENT_CARD);
84+
request.put("params", params);
85+
request.put("id", UUID.randomUUID().toString());
86+
87+
CloudEvent event = buildA2ACardEvent(request);
88+
producer.publish(event);
89+
log.info("Registered agent card: {}/{}/{}", orgId, unitId, agentId);
90+
}
91+
92+
private static void listAgentCards(EventMeshHttpProducer producer) throws Exception {
93+
Map<String, Object> params = new HashMap<>();
94+
params.put("org_id", "my.org");
95+
96+
Map<String, Object> request = new HashMap<>();
97+
request.put("jsonrpc", "2.0");
98+
request.put("method", A2AProtocolConstants.OP_LIST_AGENT_CARDS);
99+
request.put("params", params);
100+
request.put("id", UUID.randomUUID().toString());
101+
102+
CloudEvent event = buildA2ACardEvent(request);
103+
producer.publish(event);
104+
log.info("Listed agent cards for org: my.org");
105+
}
106+
107+
private static void getAgentCard(EventMeshHttpProducer producer,
108+
String orgId, String unitId, String agentId) throws Exception {
109+
Map<String, Object> params = new HashMap<>();
110+
params.put("org_id", orgId);
111+
params.put("unit_id", unitId);
112+
params.put("agent_id", agentId);
113+
114+
Map<String, Object> request = new HashMap<>();
115+
request.put("jsonrpc", "2.0");
116+
request.put("method", A2AProtocolConstants.OP_GET_AGENT_CARD);
117+
request.put("params", params);
118+
request.put("id", UUID.randomUUID().toString());
119+
120+
CloudEvent event = buildA2ACardEvent(request);
121+
producer.publish(event);
122+
log.info("Got agent card: {}/{}/{}", orgId, unitId, agentId);
123+
}
124+
125+
private static void deleteAgentCard(EventMeshHttpProducer producer,
126+
String orgId, String unitId, String agentId) throws Exception {
127+
Map<String, Object> params = new HashMap<>();
128+
params.put("org_id", orgId);
129+
params.put("unit_id", unitId);
130+
params.put("agent_id", agentId);
131+
132+
Map<String, Object> request = new HashMap<>();
133+
request.put("jsonrpc", "2.0");
134+
request.put("method", A2AProtocolConstants.OP_DELETE_AGENT_CARD);
135+
request.put("params", params);
136+
request.put("id", UUID.randomUUID().toString());
137+
138+
CloudEvent event = buildA2ACardEvent(request);
139+
producer.publish(event);
140+
log.info("Deleted agent card: {}/{}/{}", orgId, unitId, agentId);
141+
}
142+
143+
private static AgentCard buildSampleCard(String agentId) {
144+
AgentInterface iface = AgentInterface.builder()
145+
.url("http://localhost:8080/a2a")
146+
.protocolBinding("JSONRPC")
147+
.protocolVersion(A2AProtocolConstants.PROTOCOL_VERSION)
148+
.build();
149+
150+
AgentProvider provider = AgentProvider.builder()
151+
.url("https://example.org")
152+
.organization("Example Org")
153+
.build();
154+
155+
AgentCapabilities capabilities = AgentCapabilities.builder()
156+
.streaming(true)
157+
.pushNotifications(true)
158+
.build();
159+
160+
AgentSkill skill = AgentSkill.builder()
161+
.id("weather-query")
162+
.name("Weather Query")
163+
.description("Queries weather information for a given location")
164+
.tags(Arrays.asList("weather", "query"))
165+
.examples(Arrays.asList("What's the weather in Beijing?"))
166+
.build();
167+
168+
return AgentCard.builder()
169+
.name(agentId)
170+
.description("A weather query agent")
171+
.version("1.0.0")
172+
.supportedInterfaces(Collections.singletonList(iface))
173+
.provider(provider)
174+
.capabilities(capabilities)
175+
.skills(Collections.singletonList(skill))
176+
.defaultInputModes(Collections.singletonList("text/plain"))
177+
.defaultOutputModes(Collections.singletonList("text/plain"))
178+
.build();
179+
}
180+
181+
private static CloudEvent buildA2ACardEvent(Map<String, Object> jsonRpcBody) {
182+
String content = JsonUtils.toJSONString(jsonRpcBody);
183+
String method = (String) jsonRpcBody.get("method");
184+
String ceType = A2AProtocolConstants.CE_TYPE_PREFIX + method.replace("/", ".") + ".req";
185+
186+
return CloudEventBuilder.v1()
187+
.withId(UUID.randomUUID().toString())
188+
.withSource(URI.create("a2a-agent-card-demo"))
189+
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
190+
.withType(ceType)
191+
.withData(content.getBytes(StandardCharsets.UTF_8))
192+
.withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL, "A2A")
193+
.withExtension(A2AProtocolConstants.CE_EXTENSION_PROTOCOL_VERSION, "2.0")
194+
.withExtension(A2AProtocolConstants.CE_EXTENSION_A2A_METHOD, method)
195+
.withExtension(A2AProtocolConstants.CE_EXTENSION_MCP_TYPE, "request")
196+
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4_000))
197+
.build();
198+
}
199+
}

eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,26 @@ public static void main(String[] args) throws Exception {
4242
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
4343
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty(ExampleConstants.EVENTMESH_TCP_PORT));
4444
try {
45-
UserAgent userAgent = EventMeshTestUtils.generateClient1();
46-
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
47-
.host(eventMeshIp)
48-
.port(eventMeshTcpPort)
49-
.userAgent(userAgent)
50-
.build();
51-
final EventMeshTCPClient<CloudEvent> client =
52-
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
53-
client.init();
45+
final UserAgent userAgent = EventMeshTestUtils.generateClient1();
46+
try (final EventMeshTCPClient<CloudEvent> client =
47+
EventMeshTCPClientFactory.createEventMeshTCPClient(
48+
EventMeshTCPClientConfig.builder()
49+
.host(eventMeshIp)
50+
.port(eventMeshTcpPort)
51+
.userAgent(userAgent)
52+
.build(),
53+
CloudEvent.class)) {
54+
client.init();
5455

55-
for (int i = 0; i < 2; i++) {
56-
CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async();
57-
log.info("begin send async msg[{}]: {}", i, event);
58-
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
56+
for (int i = 0; i < 2; i++) {
57+
final CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async();
58+
log.info("begin send async msg[{}]: {}", i, event);
59+
client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
5960

60-
ThreadUtils.sleep(1, TimeUnit.SECONDS);
61+
ThreadUtils.sleep(1, TimeUnit.SECONDS);
62+
}
63+
ThreadUtils.sleep(2, TimeUnit.SECONDS);
6164
}
62-
ThreadUtils.sleep(2, TimeUnit.SECONDS);
6365
} catch (Exception e) {
6466
log.error("AsyncPublish failed", e);
6567
}

eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,26 @@ public static void main(String[] args) throws Exception {
4242
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty(ExampleConstants.EVENTMESH_TCP_PORT));
4343
try {
4444
final UserAgent userAgent = EventMeshTestUtils.generateClient1();
45-
final EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
46-
.host(eventMeshIp)
47-
.port(eventMeshTcpPort)
48-
.userAgent(userAgent)
49-
.build();
50-
final EventMeshTCPClient<EventMeshMessage> client =
51-
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, EventMeshMessage.class);
52-
client.init();
45+
try (final EventMeshTCPClient<EventMeshMessage> client =
46+
EventMeshTCPClientFactory.createEventMeshTCPClient(
47+
EventMeshTCPClientConfig.builder()
48+
.host(eventMeshIp)
49+
.port(eventMeshTcpPort)
50+
.userAgent(userAgent)
51+
.build(),
52+
EventMeshMessage.class)) {
53+
client.init();
5354

54-
for (int i = 0; i < 5; i++) {
55-
final EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateAsyncEventMqMsg();
55+
for (int i = 0; i < 5; i++) {
56+
final EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateAsyncEventMqMsg();
5657

57-
log.info("begin send async msg[{}]: {}", i, eventMeshMessage);
58-
client.publish(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
58+
log.info("begin send async msg[{}]: {}", i, eventMeshMessage);
59+
client.publish(eventMeshMessage, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
5960

60-
ThreadUtils.sleep(1, TimeUnit.SECONDS);
61+
ThreadUtils.sleep(1, TimeUnit.SECONDS);
62+
}
63+
ThreadUtils.sleep(2, TimeUnit.SECONDS);
6164
}
62-
ThreadUtils.sleep(2, TimeUnit.SECONDS);
6365
} catch (Exception e) {
6466
log.error("AsyncPublish failed", e);
6567
}

eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ public static void main(String[] args) throws Exception {
4040
final String eventMeshIp = properties.getProperty(ExampleConstants.EVENTMESH_IP);
4141
final int eventMeshTcpPort = Integer.parseInt(properties.getProperty(ExampleConstants.EVENTMESH_TCP_PORT));
4242
final UserAgent userAgent = EventMeshTestUtils.generateClient1();
43-
final EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
44-
.host(eventMeshIp)
45-
.port(eventMeshTcpPort)
46-
.userAgent(userAgent)
47-
.build();
48-
try {
49-
final EventMeshTCPClient<EventMeshMessage> client = EventMeshTCPClientFactory.createEventMeshTCPClient(
50-
eventMeshTcpClientConfig, EventMeshMessage.class);
43+
try (final EventMeshTCPClient<EventMeshMessage> client =
44+
EventMeshTCPClientFactory.createEventMeshTCPClient(
45+
EventMeshTCPClientConfig.builder()
46+
.host(eventMeshIp)
47+
.port(eventMeshTcpPort)
48+
.userAgent(userAgent)
49+
.build(),
50+
EventMeshMessage.class)) {
5151
client.init();
5252

5353
final EventMeshMessage eventMeshMessage = EventMeshTestUtils.generateSyncRRMqMsg();

eventmesh-protocol-plugin/eventmesh-protocol-a2a/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies {
2424
implementation "io.cloudevents:cloudevents-core"
2525
implementation "io.cloudevents:cloudevents-json-jackson"
2626
implementation "com.fasterxml.jackson.core:jackson-databind"
27+
implementation "com.networknt:json-schema-validator:1.5.6"
2728
implementation "org.slf4j:slf4j-api"
2829

2930
compileOnly 'org.projectlombok:lombok'

0 commit comments

Comments
 (0)