16
16
*/
17
17
package org .apache .seata .integration .rocketmq ;
18
18
19
+ import org .apache .rocketmq .remoting .RPCHook ;
19
20
import org .apache .seata .common .exception .NotSupportYetException ;
21
+
20
22
import java .util .Map ;
21
23
import java .util .concurrent .ConcurrentHashMap ;
22
24
@@ -30,12 +32,18 @@ public class SeataMQProducerFactory {
30
32
private static Map <String , SeataMQProducer > PRODUCER_MAP = new ConcurrentHashMap <>();
31
33
private volatile static String SINGLE_PRODUCER_ID ;
32
34
33
- public SeataMQProducer create (String groupName , String producerId ) {
35
+ public static SeataMQProducer create (String producerId , String nameServer , String producerGroup ) {
36
+ return create (producerId , nameServer , null , producerGroup , null );
37
+ }
38
+
39
+ public static SeataMQProducer create (String producerId , String nameServer , String namespace ,
40
+ String groupName , RPCHook rpcHook ) {
34
41
if (SINGLE_PRODUCER_ID == null ) {
35
42
synchronized (SeataMQProducerFactory .class ) {
36
43
if (SINGLE_PRODUCER_ID == null ) {
37
44
SINGLE_PRODUCER_ID = producerId ;
38
- SeataMQProducer producer = new SeataMQProducer (groupName );
45
+ SeataMQProducer producer = new SeataMQProducer (namespace , groupName , rpcHook );
46
+ producer .setNamesrvAddr (nameServer );
39
47
tccRocketMQ .setProducer (producer );
40
48
PRODUCER_MAP .put (producerId , producer );
41
49
}
@@ -48,7 +56,7 @@ public SeataMQProducer create(String groupName, String producerId) {
48
56
return getProducer ();
49
57
}
50
58
51
- public SeataMQProducer getProducer () {
59
+ public static SeataMQProducer getProducer () {
52
60
return PRODUCER_MAP .get (SINGLE_PRODUCER_ID );
53
61
}
54
62
0 commit comments