Skip to content

Commit c0e182a

Browse files
authored
Resolve conflicts with storage-api branch
1 parent 0448846 commit c0e182a

39 files changed

+1390
-562
lines changed

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/build.gradle

+6
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,16 @@ dependencies {
2323
api "io.dropwizard.metrics:metrics-healthchecks"
2424
api "io.dropwizard.metrics:metrics-annotation"
2525
api "io.dropwizard.metrics:metrics-json"
26+
27+
implementation 'io.cloudevents:cloudevents-json-jackson:2.4.0'
2628

2729
compileOnly 'org.projectlombok:lombok'
2830
annotationProcessor 'org.projectlombok:lombok'
2931

32+
implementation "org.mockito:mockito-core"
33+
implementation "org.powermock:powermock-module-junit4"
34+
implementation "org.powermock:powermock-api-mockito2"
35+
3036
testCompileOnly 'org.projectlombok:lombok'
3137
testAnnotationProcessor 'org.projectlombok:lombok'
3238

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/gradle.properties

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16-
#
16+
#
17+
pluginType=connector
18+
pluginName=storage

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/CloudEventUtils.java

+39-6
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,31 @@
2020
import org.apache.eventmesh.api.connector.storage.data.CloudEventInfo;
2121

2222
import java.lang.reflect.Field;
23+
import java.nio.charset.Charset;
24+
import java.util.ArrayList;
25+
import java.util.List;
2326
import java.util.Map;
2427
import java.util.Objects;
2528

2629
import io.cloudevents.CloudEvent;
30+
import io.cloudevents.core.format.EventFormat;
2731
import io.cloudevents.core.impl.BaseCloudEvent;
32+
import io.cloudevents.core.provider.EventFormatProvider;
2833
import io.cloudevents.core.v03.CloudEventV03;
2934
import io.cloudevents.core.v1.CloudEventV1;
3035

3136
public class CloudEventUtils {
37+
38+
public static EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat("application/cloudevents+json");
3239

3340
private static Field CLOUD_EVENT_EXTENSIONS_FIELD;
3441

3542
static {
3643
try {
37-
CLOUD_EVENT_EXTENSIONS_FIELD = BaseCloudEvent.class.getField("extensions");
44+
CLOUD_EVENT_EXTENSIONS_FIELD = BaseCloudEvent.class.getDeclaredField("extensions");
3845
CLOUD_EVENT_EXTENSIONS_FIELD.setAccessible(true);
3946
} catch (NoSuchFieldException | SecurityException e) {
40-
e.printStackTrace();
47+
throw new RuntimeException(e.getMessage() , e );
4148
}
4249
}
4350

@@ -50,22 +57,48 @@ public static CloudEvent setValue(CloudEvent cloudEvent, String key, Object valu
5057
extensions.put(key, value);
5158
return cloudEvent;
5259
} catch (Exception e) {
53-
e.printStackTrace();
60+
throw new RuntimeException(e);
5461
}
5562
}
5663
return null;
5764
}
65+
66+
public static String getTableName(CloudEvent cloudEvent) {
67+
return cloudEvent.getSubject();
68+
}
69+
70+
public static String checkConsumerGroupName(String topic) {
71+
return topic.replace("-", "_");
72+
}
73+
74+
public static List<Object> getParameterToCloudEvent(CloudEvent cloudEvent) {
75+
List<Object> parameterList = new ArrayList<>();
76+
String id = (String) cloudEvent.getExtension("cloudeventid");
77+
parameterList.add(Objects.isNull(id)?"1":id);// id
78+
parameterList.add(getTableName(cloudEvent));// topic
79+
parameterList.add("");// cloud_event_storage_node_adress
80+
parameterList.add("");// cloud_event_type
81+
parameterList.add("");// cloud_event_producer_group_name
82+
parameterList.add("");// cloud_event_source
83+
parameterList.add("application/cloudevents+json");// cloud_event_content_type
84+
// parameterList.add("");//cloud_event_tag
85+
parameterList.add("");// cloud_event_extensions
86+
String data = new String(CloudEventUtils.eventFormat.serialize(cloudEvent), Charset.forName("UTF-8"));
87+
parameterList.add(data);// cloud_event_data
88+
parameterList.add("{}");
89+
return parameterList;
90+
}
5891

5992
public static String getNodeAdress(CloudEvent cloudEvent) {
60-
return (String) cloudEvent.getExtension(Constant.NODE_ADDRESS);
93+
return (String) cloudEvent.getExtension(Constant.STORAGE_CONFIG_ADDRESS);
6194
}
6295

6396
public static String getTopic(CloudEvent cloudEvent) {
64-
return null;
97+
return cloudEvent.getSubject();
6598
}
6699

67100
public static String getId(CloudEvent cloudEvent) {
68-
return null;
101+
return "";
69102
}
70103

71104
public static CloudEvent createCloudEvent(CloudEventInfo cloudEventInfo) {

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorInfo.java renamed to eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/ConnectorResourceServiceStorageImpl.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,22 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
1817
package org.apache.eventmesh.api.connector.storage;
1918

20-
import lombok.Data;
19+
import org.apache.eventmesh.api.connector.ConnectorResourceService;
20+
21+
public class ConnectorResourceServiceStorageImpl implements ConnectorResourceService{
22+
23+
@Override
24+
public void init() throws Exception {
25+
// TODO Auto-generated method stub
26+
27+
}
2128

22-
@Data
23-
public class StorageConnectorInfo {
29+
@Override
30+
public void release() throws Exception {
31+
// TODO Auto-generated method stub
32+
33+
}
2434

25-
private boolean distinguishTopic;
2635
}

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/Constant.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,24 @@
1919

2020
public class Constant {
2121

22-
public static final String NODE_ADDRESS = "nodeAddress";
22+
public static final String STORAGE_CONFIG_ADDRESS = "eventMesh.connector.plugin.storage.nodeaddress";
23+
24+
public static final String STORAGE_CONFIG_TYPE = "eventMesh.connector.plugin.storage.type";
25+
26+
public static final String STORAGE_CONFIG_JDBC_PARAMETER = "eventMesh.connector.plugin.storage.jdbc.parameter";
27+
28+
public static final String STORAGE_CONFIG_USER_NAME = "eventMesh.connector.plugin.storage.username";
29+
30+
public static final String STORAGE_CONFIG_PASSWORD = "eventMesh.connector.plugin.storage.password";
31+
32+
public static final String STORAGE_CONFIG_JDBC_TYPE = "eventMesh.connector.plugin.storage.jdbc.dbType";
33+
34+
public static final String STORAGE_CONFIG_JDBC_MAXACTIVE = "eventMesh.connector.plugin.storage.jdbc.maxActive";
35+
36+
public static final String STORAGE_CONFIG_JDBC_MAXWAIT = "eventMesh.connector.plugin.storage.jdbc.maxWait";
37+
38+
public static final String STORAGE_ID = "storageid";
39+
40+
public static final String STORAGE_NODE_ADDRESS = "address";
41+
2342
}

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
@Data
2323
public class StorageConfig {
2424

25-
private long pullInterval;
25+
private long pullInterval = 200;
2626

27-
private long pullThresholdForQueue;
27+
private long pullThresholdForQueue = 5;
2828
}

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnector.java

+4
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ public interface StorageConnector extends LifeCycle {
6161
public List<CloudEvent> pull(PullRequest pullRequest) throws Exception;
6262

6363
void updateOffset(List<CloudEvent> cloudEvents, AbstractContext context);
64+
65+
public default String getTopic(String topic) {
66+
return topic;
67+
}
6468

6569
public default int deleteCloudEvent(CloudEvent cloudEvent) {
6670
return 0;

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorMetedata.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,7 @@ public interface StorageConnectorMetedata {
3535
public int createTopic(TopicInfo topicInfo) throws Exception;
3636

3737
public int createConsumerGroupInfo(ConsumerGroupInfo consumerGroupInfo) throws Exception;
38+
39+
public List<TopicInfo> geTopicInfos(Set<String> topics,String key) throws Exception;
3840

3941
}

Diff for: eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/connector/storage/StorageConnectorProxy.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,21 @@
2424
import org.apache.eventmesh.api.connector.storage.data.TopicInfo;
2525
import org.apache.eventmesh.api.connector.storage.metadata.RouteHandler;
2626
import org.apache.eventmesh.api.connector.storage.metadata.StorageMetaServcie;
27+
import org.apache.eventmesh.api.connector.storage.reply.ReplyOperation;
2728
import org.apache.eventmesh.api.connector.storage.reply.ReplyOperationService;
2829
import org.apache.eventmesh.api.connector.storage.reply.RequestReplyInfo;
2930
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
3031
import org.apache.eventmesh.api.exception.OnExceptionContext;
3132

33+
import java.util.Collection;
3234
import java.util.List;
3335
import java.util.Map;
3436
import java.util.Properties;
3537
import java.util.concurrent.ConcurrentHashMap;
3638
import java.util.concurrent.Executor;
3739

3840
import io.cloudevents.CloudEvent;
41+
import lombok.Setter;
3942

4043
public class StorageConnectorProxy implements StorageConnector {
4144

@@ -45,10 +48,13 @@ public class StorageConnectorProxy implements StorageConnector {
4548

4649
private RouteHandler routeHandler = new RouteHandler();
4750

51+
@Setter
4852
private ReplyOperationService replyService;
4953

54+
@Setter
5055
private StorageMetaServcie storageMetaServcie;
5156

57+
@Setter
5258
private Executor executor;
5359

5460
@Override
@@ -63,7 +69,12 @@ public void shutdown() {
6369
public void init(Properties properties) throws Exception {
6470
}
6571

72+
public Collection<StorageConnector> getStorageConnectorList(){
73+
return this.storageConnectorByKeyMap.values();
74+
}
75+
6676
public void setConnector(StorageConnector storageConnector, String key) {
77+
routeHandler.addStorageConnector(storageConnector);
6778
storageConnectorMap.put(storageConnector, key);
6879
storageConnectorByKeyMap.put(key, storageConnector);
6980
}
@@ -84,6 +95,7 @@ private void doPublish(CloudEvent cloudEvent, SendCallback sendCallback) {
8495
if (storageConnector instanceof StorageConnectorMetedata
8596
&& !storageMetaServcie.isTopic(storageConnector, CloudEventUtils.getTopic(cloudEvent))) {
8697
TopicInfo topicInfo = new TopicInfo();
98+
topicInfo.setTopicName(CloudEventUtils.getTopic(cloudEvent));
8799
StorageConnectorMetedata storageConnectorMetedata = (StorageConnectorMetedata) storageConnector;
88100
storageConnectorMetedata.createTopic(topicInfo);
89101
}
@@ -114,15 +126,18 @@ public void run() {
114126
public void doRequest(CloudEvent cloudEvent, RequestReplyCallback requestReplyCallback, long timeout) {
115127
try {
116128
StorageConnector storageConnector = routeHandler.select();
129+
if(!(storageConnector instanceof ReplyOperation)) {
130+
return;
131+
}
117132
String key = storageConnectorMap.get(storageConnector);
118-
CloudEventUtils.setValue(cloudEvent, "nodeAddress", key);
133+
CloudEventUtils.setValue(cloudEvent, Constant.STORAGE_CONFIG_ADDRESS, key);
119134
storageConnector.request(cloudEvent, requestReplyCallback, timeout);
120-
Long storageId = (Long) cloudEvent.getExtension("storageId");
135+
Long storageId = Long.valueOf( cloudEvent.getExtension(Constant.STORAGE_ID).toString());
121136
RequestReplyInfo requestReplyInfo = new RequestReplyInfo();
122137
requestReplyInfo.setStorageId(storageId);
123138
requestReplyInfo.setTimeOut(System.currentTimeMillis() + timeout);
124139
requestReplyInfo.setRequestReplyCallback(requestReplyCallback);
125-
replyService.setRequestReplyInfo(null, cloudEvent.getType(), storageId, requestReplyInfo);
140+
replyService.setRequestReplyInfo((ReplyOperation)storageConnector, cloudEvent.getType(), storageId, requestReplyInfo);
126141
} catch (Exception e) {
127142
requestReplyCallback.onException(e);
128143
}

0 commit comments

Comments
 (0)