在 Framework 分布式消息抽象层,抽象了通信模型使用到的 3 个主要接口,分别是:消费者接口 Conusmer、生产者接口 Producer 和监听接口Notifier,接口按消息类型定义了字符串消息、MQ 原始消息、Framework 封装消息对应的消费/发送方法,用于满足 MQ 通用场景。
在需要使用到各 MQ 系统的消费者/生产者高阶用例场景中,如自定义消费逻辑、消费重试等,接口定义了获取底层的委托消费者/生产者对象方法,通过它调用对应 MQ 系统原生 SDK 来进行处理。
在使用 spring stream 的方式工作的场景中,可以通过配置 Framework 关闭 Framework 的消息驱动。关闭后Framework 的 messaging 机制将完全遵循原生的消息驱动的实现。
分布式消息抽象设计如下:
@startuml
package "cloudapp-base-api" {
interface Consumer<UnderlyingConsumer, Message> {
UnderlyingConsumer getDelegatingConsumer()
MQMessage<? extends Message> pull(Destination destination)
Collection<MQMessage<? extends Message>> pull(Destination destination, int count)
Message pull(String topic)
Collection<Message> pull(String topic, int count)
MQMessage<? extends Message> pull(final Destination destination,TimeUnit timeout)
Message pull(final String topic, TimeUnit timeout)
Collection<MQMessage<? extends Message>> pull(final Destination destination, int count,TimeUnit timeout)
Collection<Message> pull(final String topic, int count,TimeUnit timeout)
void subscribe(final Destination destination, final Notifier<? extends Message> notifier)
void subscribe(String topic, final Notifier<? extends Message> notifier)
void unsubscribe(final Destination destination)
void unsubscribe(String topic)
void unsubscribe(final Destination destination, final Notifier<? extends Message> notifier)
void unsubscribe(String topic, final Notifier<? extends Message> notifier)
}
interface Producer<UnderlyingProducer, Message> {
UnderlyingProducer getDelegatingProducer()
void send(MQMessage<? extends Message> message)
void send(Destination destination, Message message)
void send(String topic, String message)
void send(Destination destination, String message)
CompletableFuture<?> sendAsync(MQMessage<? extends Message> message)
CompletableFuture<?> sendAsync(Destination destination, Message message)
CompletableFuture<?> sendAsync(String topic, String message)
CompletableFuture<?> sendAsync(Destination destination, String message)
}
interface Notifier<T> {
void onMessageNotified(MQMessage<T> message)
}
interface Destination
class Location {
private String host;
private int pid;
private String threadName;
private int threadId;
private String traceId;
private String spanId;
}
abstract class MQMessage<Message> {
private String messageID;
private long sentTimestamp;
private long deliveredTimestamp;
private long receivedTimestamp;
private Message messageBody;
private Location sender;
private Location receiver;
private Destination destination;
private Destination replyTo;
}
Location -[hidden]- Destination
MQMessage <-- Notifier : use
Location <-- MQMessage : use
Destination <-- MQMessage : use
Destination <-- Consumer : use
MQMessage <-- Consumer : use
Notifier <-- Consumer : use
Destination <-- Producer : use
MQMessage <-- Producer : use
}
@enduml-
定义消息消费者接口 Consumer ,主要方法有:
-
getDelegatingConsumer() 获取底层的委托消费者对象,通过它调用对应 MQ 系统原生 SDK 来处理更底层或高级的消费者逻辑;
-
pull(Destination destination) 从 Destination 消费消息;
-
pull(Destination destination, int count) 从Destination 消费一组消息;
-
pull(String topic) 从 topic 中消费消息;
-
pull(String topic, int count) 从 topic 中消费一组消息;
-
pull(final Destination destination,TimeUnit timeout) 从Destination 消费消息,并设置超时;
-
pull(final String topic, TimeUnit timeout) 从topic 消费消息,并设置超时;
-
pull(final Destination destination, int count,TimeUnit timeout) 从Destination 消费一组消息,并设置超时;
-
pull(final String topic, int count,TimeUnit timeout) 从 topic 消费一组消息,并设置超时
-
subscribe(final Destination destination, final Notifier<? extends Message> notifier) 从Destination订阅消息主题,并设置监听;
-
subscribe(String topic, final Notifier<? extends Message> notifier) 从 topic 订阅消息主题,并设置监听;
-
unsubscribe(final Destination destination) 从Destination 取消订阅;
-
unsubscribe(final Destination destination, final Notifier<? extends Message> notifier) 从Destination 取消订阅,并设置监听;
-
unsubscribe(String topic) 从 topic 取消订阅
-
unsubscribe(String topic, final Notifier<? extends Message> notifier) 从 topic 取消订阅并设置监听;
-
-
定义消息生产者接口 Producer ,主要方法有:
-
getDelegatingProducer() 获取底层的委托生产者对象,通过它调用对应 MQ 系统原生 SDK 来处理更底层或高级的生产者逻辑;
-
send(MQMessage<? extends Message> message) 发送MQMessage封装消息;
-
send(Destination destination, Message message) 向Destination发送Message 原始消息;
-
send(String topic, String message) 向topic 发送字符串消息;
-
send(Destination destination, String message) 向Destination 发送字符串消息;
-
sendAsync(MQMessage<? extends Message> message) 异步发送MQMessage封装消息;
-
sendAsync(Destination destination, Message message) 异步向Destination发送Message 原始消息;
-
sendAsync(String topic, String message) 异步向topic 发送字符串消息;
-
sendAsync(Destination destination, String message) 异步向Destination 发送字符串消息;
-
-
定义监听接口 Notifier ,主要方法有:onMessageNotified(MQMessage message)当监听事件触发时进行处理;
-
定义接口 Destination ,用于设置消息目标;
-
定义类 Location ,用于设置消息发送或接受位置,包括主机、进程 id、线程名、线程 id 等;
-
定义抽象类 MQMessage ,用于封装消息;
分布式消息实现的配置如下:
- 定义 Kafka 实现的配置参数类 CloudAppKafkaProperties ,使用注解 @ConfigurationProperties("com.alibaba.cloudapp.messaging.kafka") ,配置参数类字段如下:
| 字段名 | 数据类型 | 默认值 | 备注 true |
|---|---|---|---|
| enabled | boolean | true | 是否启用 |
| servers | String | - | 服务器地址 |
| ssl | org.springframework.boot.autoconfigure.kafka.KafkaProperties.Ssl | new KafkaProperties.Ssl() | SSL 设置 |
| username | String | - | 用户名 |
| password | String | - | 密码 |
| mechanism | String | - | 客户端连接的身份验证机制,非必填,示例值:PLAIN、SCRAM-SHA-256 |
| securityProtocol | String | - | 通信的协议,非必填,示例值:SASL_SSL、SASL_PLAINTEXT |
| identificationAlgorithm | String | - | 使用服务器证书验证服务器主机名的识别算法 |
| inputs | List<KafkaConsumerProperties> | - | kafka 消费者列表 |
| outputs | List<KafkaProducerProperties> | - | kafka 生产者列表 |
kafka客户端需要的SSL证书(trust-store-location)默认存放在kafka服务端
/home/admin/kafka-pkgs/kafka.client.truststore.jks路径中。
定义 Kafka 消费者参数类 KafkaConsumerProperties ,字段如下:
| 字段名 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| group | String | - | 消费者分组 |
| name | String | - | 名称 |
| topic | String | - | 消息主题 |
| bootstrapServers | String | - | Kafka 集群访问地址,格式为:host:port[,host2:port][,host3:port] |
| autoOffsetReset | String | - | 当 Kafka 中没有初始偏移量或服务器上当前偏移量不再存在时的处理方式 |
| maxFetchBytes | int | - | 服务器给获取请求返回的最大数据量 |
| sessionTimeout | int | - | Kafka 的组管理工具时用于检测客户端故障的超时时间,单位:毫秒 |
| keyDeserializer | Class | StringDeserializer.class | 值的反序列化类 |
| valueDeserializer | Class | StringDeserializer.class | 键 的反序列化类 |
| isolationLevel | Byte | 0 | 读取已事务性写入的消息的隔离级别,可选值有: IsolationLevel.READ_UNCOMMITTED(0)、IsolationLevel.READ_COMMITTED(1) |
| properties | Map<String, Object> | - | 用于配置客户端的附加消费者特定属性 |
定义 Kafka 生产者参数类 KafkaProducerProperties ,字段如下:
| 字段名 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| group | String | - | 分组 |
| name | String | - | 名称 |
| topic | String | - | 消息主题 |
| partition | int | - | 分区 |
| bootstrapServers | String | - | Kafka 集群访问地址,格式为:host:port[,host2:port][,host3:port] |
| reconnectBackoff | int | 3000 | 设置客户端在尝试重新连接到 Broker 之前的等待时间,单位毫秒 |
- 定义 RocketMQ 实现的配置参数类 CloudAppRocketProperties ,使用注解 @ConfigurationProperties(prefix = "com.alibaba.cloudapp.messaging.rocketmq",ignoreUnknownFields = false,ignoreInvalidFields = true) ,配置参数类字段如下:
| 字段名 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| enabled | boolean | - | 是否启用 |
| nameServer | String | - | Name Server 的地址 |
| username | String | - | 用户名 |
| password | String | - | 密码 |
| useTLS | boolean | - | 启用 TLS |
| enableMsgTrace | boolean | - | 启用消息追踪 |
| traceTopic | String | - | 消息追踪主题,消息追踪数据将存储在此主题中 |
| accessChannel | String | LOCAL | 访问通道,可选值:CLOUD、LOCAL |
| inputs | List<RocketConsumerProperties> | - | 消费者列表 |
| outputs | List<RocketProducerProperties> | - | 生产者列表 |
定义 RocketMQ 消费者参数类 RocketConsumerProperties ,字段如下:
| 字段名 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| accessChannel | String | - | 访问通道,可选值:CLOUD、LOCAL |
| group | String | - | 消费者分组 |
| messageModel | MessageModel | MessageModel.CLUSTERING | 消息模型,可选值:BROADCASTING(广播模式)、CLUSTERING(集群模式) |
| pullBatchSize | int | 10 | 消费者在一次拉取操作中从消息队列请求的消息数量上限 |
| namespace | String | - | - |
| name | String | - | 名称 |
| topic | String | - | 主题 |
| tags | List | - | 标签列表 |
| nameServer | String | - | Name Server 的地址 |
| username | String | - | 用户名 |
| password | String | - | 密码 |
| useTLS | boolean | false | 启用 TLS |
| enableMsgTrace | boolean | - | 启用消息追踪 |
| traceTopic | String | - | 消息追踪主题,消息追踪数据将存储在此主题中 |
| type | String | - | 消费类型,可选值:PUSH、PULL |
| isDefault | boolean | false | 是否为默认消费者。在应用配置文件中可配置多个消费者,其中默认消费者将作为自动注入 RocketMQTemplate 的配置 |
定义 RocketMQ 生产者参数类 RocketProducerProperties ,字段如下:
| 字段名 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| group | String | - | 生产者分组 |
| name | String | - | 名称 |
| namespace | String | - | - |
| sendTimeout | int | 3000 | 消息发送超时时间,单位超时 |
| retryNextServer | boolean | false | 消息发送失败时是否重新发送到另一个 Broker |
| compressMsgBodyOverHowMuch | int | 1024 * 4 | 消息体进行压缩的阈值,单位 Byte |
| maxMessageSize | int | 1024 * 1024 * 4 | 消息的最大大小,单位 Byte |
| retryTimesWhenSendFailed | int | 2 | 消息发送失败时,重试次数 |
| nameServer | String | - | Name Server 的地址 |
| username | String | - | 用户名 |
| password | String | - | 密码 |
| useTLS | boolean | false | 启用 TLS |
| enableMsgTrace | boolean | - | 启用消息追踪 |
| traceTopic | String | - | 消息追踪主题,消息追踪数据将存储在此主题中 |
| isDefault | boolean | false | 是否为默认生产者。在应用配置文件中可配置多个生产者,其中默认生产者将作为自动注入 RocketMQTemplate 的配置 |
使用场景按 MQ 系统分为两大类:RocketMQ 和 Kafka。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>1.0.0</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-messaging-rocketmq</artifactId>
</dependency>
</dependencies>spring:
application:
name: ConsumerDemo
io:
cloudapp:
messaging:
rocketmq:
enabled: true
nameServer: ${ROCKET_HOST}
username: ${ROCKET_USERNAME}
password: ${ROCKET_PASSWORD}
inputs:
- group: test-group
name: rocketConsumer
topic: test-topic
outputs:
- name: rocketProducer
group: test-group
server:
port: 8099@RestController
public class RocketConsumerDemoController {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketConsumerDemoController.class);
@Autowired
@Qualifier("rocketConsumer")
@Lazy
CloudAppRocketConsumer cloudAppRocketConsumer;
@Autowired
@Qualifier("rocketProducer")
@Lazy
CloudAppRocketProducer cloudAppRocketProducer;
@RequestMapping("/testRocketConsumer")
public void testRocketConsumer() {
MessageExt message = cloudAppRocketConsumer.pull("test-topic");
LOGGER.info(message.toString());
}
@RequestMapping("/testRocketProducer")
public void testRocketProducer() {
try {
cloudAppRocketProducer.send("test-topic", "hello world!" );
LOGGER.info("send message success!");
} catch (CloudAppException e) {
LOGGER.error("send message failed!", e);
e.printStackTrace();
}
}
} <dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>1.0.0</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-messaging-rocketmq</artifactId>
</dependency>
</dependencies>spring:
application:
name: ConsumerDemo
io:
cloudapp:
messaging:
rocketmq:
enabled: true
nameServer: ${ROCKET_HOST}
username: ${ROCKET_USERNAME}
password: ${ROCKET_PASSWORD}
inputs:
- group: test-group
name: rocketConsumer
topic: test-topic
outputs:
- name: rocketProducer
group: test-group
server:
port: 8099 @RestController
public class RocketConsumerDemoController {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketConsumerDemoController.class);
@Autowired
@Qualifier("rocketConsumer")
@Lazy
CloudAppRocketConsumer cloudAppRocketConsumer;
@Autowired
@Qualifier("rocketProducer")
@Lazy
CloudAppRocketProducer cloudAppRocketProducer;
@RequestMapping("/testDelegationRocketConsumer")
public void testDelegationRocketConsumer() {
List<MessageExt> messages = cloudAppRocketConsumer.getDelegatingConsumer().poll(1000);
LOGGER.info(messages.toString());
}
@RequestMapping("/testDelegationRocketProducer")
public void testDelegationRocketProducer() {
Message message = new Message("test-topic", new byte[1024]);
SendResult result = null;
try {
result = cloudAppRocketProducer.getDelegatingProducer().send(message);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info(result.toString());
}
}<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>${cloudapp.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-messaging-rocketmq-streaming</artifactId>
</dependency>
</dependencies>io:
cloudapp:
messaging:
rocketmq:
enabled: false # 与 spring stream 的方式工作时,关闭 cloudapp 的驱动
当我们需要与 spring stream 的方式工作时,在以上的配置中,将关闭 cloudapp 的消息驱动。此时将 cloudapp 的 messaging 机制将完全遵循原生的消息驱动的实现。关于 RocketMQ 的 streaming 使用方式,请参考:https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>${cloudapp.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>spring:
application:
name: KafkaDemo
io:
cloudapp:
messaging:
kafka:
servers: ${KAFKA_SERVER}
username: ${KAFKA_USERNAME}
password: ${KAFKA_PASSWORD}
mechanism: PLAIN
security-protocol: SASL_SSL
ssl:
trust-store-location: classpath:/sasl/mix.4096.client.truststore.jks
inputs:
- name: testKafkaConsumer
topic: test-topic
group: test-group
outputs:
- name: testKafkaProducer
group: test-group
topic: test-topic
server:
port: 8099《mix.4096.client.truststore.jks》
@RestController
public class KafkaDemoController {
@Autowired
@Qualifier("testKafkaConsumerConfig")
KafkaConsumerProperties kafkaConsumerProperties;
@Autowired
@Qualifier("testKafkaProducer")
CloudAppKafkaProducer cloudAppKafkaProducer;
@RequestMapping("/testCloudAppKafkaConsumer")
public void testKafkaProducerProperties() {
kafkaConsumerProperties.setName("test-consumer" + RandomStringGenerator.generate(3));
CloudAppKafkaConsumer consumer = new CloudAppKafkaConsumer(kafkaConsumerProperties);
consumer.poll(1000);
}
@RequestMapping("/testCloudAppKafkaProducer")
public void testCloudAppKafkaProducer() {
cloudAppKafkaProducer.send("test-topic", "hello world!");
}
}<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>${cloudapp.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>spring:
application:
name: KafkaDemo
io:
cloudapp:
messaging:
kafka:
servers: ${KAFKA_SERVER}
username: ${KAFKA_USERNAME}
password: ${KAFKA_PASSWORD}
mechanism: PLAIN
security-protocol: SASL_SSL
ssl:
trust-store-location: classpath:/sasl/mix.4096.client.truststore.jks
inputs:
- name: testKafkaConsumer
topic: test-topic
group: test-group
outputs:
- name: testKafkaProducer
group: test-group
topic: test-topic
server:
port: 8099《mix.4096.client.truststore.jks》
@RestController
public class KafkaDemoController {
@Autowired
@Qualifier("testKafkaConsumerConfig")
KafkaConsumerProperties kafkaConsumerProperties;
@Autowired
@Qualifier("testKafkaProducer")
CloudAppKafkaProducer cloudAppKafkaProducer;
@RequestMapping("/testDelegatingKafkaConsumer")
public void testDelegatingKafkaConsumer() {
kafkaConsumerProperties.setName("test-consumer");
CloudAppKafkaConsumer consumer = new CloudAppKafkaConsumer(kafkaConsumerProperties);
consumer.getDelegatingConsumer().poll(1000);
}
@RequestMapping("/testDelegateKafkaProducer")
public void testDelegateKafkaProducer() {
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "messageKey", "hello world!");
cloudAppKafkaProducer.getDelegatingProducer().send(record);
}
}<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-framework-dependencies</artifactId>
<version>${cloudapp.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>spring-boot-starter-cloudapp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloudapp</groupId>
<artifactId>cloudapp-spring-messaging-kafka-streaming</artifactId>
</dependency>
</dependencies>io:
cloudapp:
messaging:
kafka:
enabled: false # 与 spring stream 的方式工作时,关闭 cloudapp 的驱动
当我们需要与 spring stream 的方式工作时,在以上的配置中,将关闭 cloudapp 的消息驱动。此时将 cloudapp 的 messaging 机制将完全遵循原生的消息驱动的实现。关于 Kafka 的 streaming 使用方式,请参考:https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ