Skip to content

Commit a538864

Browse files
authored
Merge pull request #2092 from jonyangx/master
[ISSUE #2091]add qmq connector plugin close #2091
2 parents 7630ec1 + f1e244c commit a538864

30 files changed

+2840
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
configurations {
19+
implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
20+
implementation.exclude group: 'log4j', module: 'log4j'
21+
}
22+
23+
dependencies {
24+
implementation project(":eventmesh-common")
25+
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
26+
implementation 'com.qunar.qmq:qmq:1.1.41'
27+
28+
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
29+
testImplementation project(":eventmesh-common")
30+
testImplementation 'com.qunar.qmq:qmq:1.1.41'
31+
32+
implementation 'io.cloudevents:cloudevents-json-jackson'
33+
34+
testImplementation 'io.cloudevents:cloudevents-json-jackson'
35+
36+
testImplementation('junit:junit:4.13')
37+
38+
testImplementation "org.mockito:mockito-core"
39+
testImplementation "org.powermock:powermock-module-junit4"
40+
testImplementation "org.powermock:powermock-api-mockito2"
41+
42+
compileOnly 'org.projectlombok:lombok:1.18.22'
43+
annotationProcessor 'org.projectlombok:lombok:1.18.22'
44+
45+
testCompileOnly 'org.projectlombok:lombok:1.18.22'
46+
testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
pluginType=connector
18+
pluginName=qmq
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.qmq.common;
19+
20+
public class EventMeshConstants {
21+
public static final String EVENTMESH_CONF_FILE = "qmq-client.properties";
22+
23+
public static final String QMQ_METASERVER_KEY = "eventMesh.server.qmq.metaserver";
24+
25+
public static final String QMQ_APPCODE_KEY = "eventMesh.server.qmq.appcode";
26+
27+
public static final String QMQ_CONSUMERGROUP_KEY = "eventMesh.server.qmq.consumergroup";
28+
29+
public static final String QMQ_CONSUMER_THREADPOOLSIZE_KEY = "eventMesh.server.qmq.consumer.threadpoolsize";
30+
31+
public static final String QMQ_CONSUMER_THREADPOOLQUEUESIZE_KEY = "eventMesh.server.qmq.consumer.threadpoolqueuesize";
32+
33+
public static final String QMQ_MSG_BODY = "qmqMsgBody";
34+
35+
public static final String QMQ_IDC_KEY = "eventMesh.server.qmq.idc";
36+
37+
public static final String QMQ_PRODUCER_THREADCOUNT_KEY = "eventMesh.server.qmq.producer.threadcount";
38+
39+
public static final String QMQ_PRODUCER_BATCHSIZE_KEY = "30";
40+
41+
42+
public static final String QMQ_PRODUCER_TRYCOUNT_KEY = "eventMesh.server.qmq.producer.trycount";
43+
44+
public static final String QMQ_PRODUCER_MAXQUEUESIZE_KEY = "eventMesh.server.qmq.producer.maxqueuesize";
45+
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.qmq.connector;
19+
20+
import org.apache.eventmesh.api.connector.ConnectorResourceService;
21+
22+
public class ConnectorResourceServiceQMQImpl implements ConnectorResourceService {
23+
@Override
24+
public void init() throws Exception {
25+
26+
}
27+
28+
@Override
29+
public void release() throws Exception {
30+
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.connector.qmq.consumer;
19+
20+
21+
import java.util.concurrent.ArrayBlockingQueue;
22+
import java.util.concurrent.ThreadFactory;
23+
import java.util.concurrent.ThreadPoolExecutor;
24+
import java.util.concurrent.TimeUnit;
25+
26+
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
27+
28+
private static final class BlockingQueue<E> extends ArrayBlockingQueue<E> {
29+
30+
public BlockingQueue(int capacity) {
31+
super(capacity);
32+
}
33+
34+
@Override
35+
public boolean offer(E e) {
36+
try {
37+
put(e);
38+
return true;
39+
} catch (InterruptedException ie) {
40+
Thread.currentThread().interrupt();
41+
}
42+
43+
return false;
44+
}
45+
}
46+
47+
public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
48+
TimeUnit unit, int taskQueueSize, ThreadFactory threadFactory) {
49+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new BlockingQueue<Runnable>(
50+
taskQueueSize), threadFactory);
51+
}
52+
53+
}

0 commit comments

Comments
 (0)