Skip to content

sona server 系统介绍

qinwei edited this page Dec 14, 2022 · 16 revisions

sona 介绍

sona-server 一共包含7个模块

  • sona-common 是基础模块,包含全链路日志、dubbo 的filter 等功能,被其他模块依赖
  • sona-gateway 是长连接网关,基于 Netty 开发,使用 mercury 二进制协议
  • sona-service 是sona的核心处理服务,包括房间管理、房间IM、实时音视频等
  • sona-session 是在线状态服务,提供用户的长连在线状态信息
  • sona-web 是给客户端提供的web服务,以及即构、腾讯云等云商回调接口
  • sona-console 是控制台页面的后台服务,包含消息全链路查询、长连事件查询、云商热切等功能
  • sona-demo-web 是一个简单的聊天室 demo,可以基于它快速体验聊天室基本的语音、消息、打赏等功能

因为sona-server是一套完整的语音房系统,依赖比较多,包括: Springboot/Dubbo/Zookeeper/Apollo/Cat/Redis/RocketMQ/Kafka/MySql/ElasticSearch/HBase

每个模块都可单独打包,可以在 pom.xml 文件中配置自己公司的maven仓库,方便服务部署。

Architecture

Apollo 配置

sona-gateway

key Description value
dubbo.registry.address dubbo注册地址 zookeeper://127.0.0.1:2181
spring.kafka.bootstrap-servers kafka server配置 127.0.0.1:9092
rocketmq.name-server rocketmq server配置 127.0.0.1:9876
close.msg.throttling gateway需要重启时,对发送的close消息进行平滑限流,每秒最多发出X条close消息 750
close.msg.max.wait.seconds 开始发送close消息后,server会等待连接全部被关闭。但此过程如果超过X秒,则不再等待,直接调用shutdownGracefully 40
handshake.wait.seconds 连接建立后X秒内不登录认证的连接,会被强制断开 5
channel.idle.seconds X秒内没有读到过新数据的连接,认为是僵死连接,会被强制断开 280
probe.idle.seconds X秒内没有读到过新数据,且在前台状态的连接,服务端主动发送探测消息 130
probe.wait.seconds 发送探测消息后X秒内没有收到回复的连接,会被强制断开 4
long.lasting.close.hours 持续时间超过X小时的连接,定期断开,以免redis数据过期失效,导致其他错误。设为-1关闭此功能 144
room.message.async 是否异步发送房间消息 false

sona-service

key Description value
dubbo.registry.address dubbo注册地址 zookeeper://127.0.0.1:2181
spring.kafka.bootstrap-servers kafka server配置 127.0.0.1:9092
rocketmq.name-server rocketmq server配置 127.0.0.1:9876
spring.redis.host redis 地址配置 127.0.0.1
spring.redis.password redis密码 qinwei
spring.datasource.url mysql连接地址 jdbc:mysql://127.0.0.1:3306/sona?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false
spring.datasource.username mysql用户名 root
spring.datasource.password mysql密码 qinwei123
zego.app.id 申请zego的 appid
zego.app.secret 申请zego 的app秘钥
zego.start.mix.url 发起zego混流的地址
zego.stop.mix.url 发起zego停止混流的地址
zego.token.url 获取zego token的地址
zego.server.secret 申请zego的server 秘钥
zego.mix.retry.count 发起zego混流的失败重试次数 0
zego.prefix.url 播放zego cdn混流的域名
zego.appsign zego 的appsign
tencent.appid 申请trtc的 appid
tencent.region 申请trtc的 region
tencent.endpoint 申请trtc的endpoint
tencent.secret.id 申请trtc的secret id
tencent.secret.key 申请trtc的 secret key
tencent.secretkey 申请trtc的secretkey
tencent.prefix.url trtc 拉流的域名
tencent.rtmp.url trtc 的rtmp域名
spi.msg.productCode 客户端发送消息回调业务方(多个业务方用 逗号 隔开,例如 "chatroom,living",配置了业务方后,需要实现 MessageCallbackRemoteService接口,在回调方法中可以对消息进行 风控或者 消息内容的组装)
message.flow.config 消息频控配置(基于消息优先级配置) {"HIGH":{"capacity":0,"highCapacity":30,"request":1,"deduct":1},"MEDIUM_HIGH":{"capacity":60,"highCapacity":30,"request":30,"deduct":1},"MEDIUM":{"capacity":60,"highCapacity":30,"request":30,"deduct":1},"LOW":{"capacity":60,"highCapacity":30,"request":30,"deduct":1}}
message.delay.config 消息最大延迟配置(单位毫秒,超过最大延迟时间还未发送的消息会丢弃) {"MEDIUM_HIGH":30000,"MEDIUM":5000,"LOW":1000}

sona-session

key Description value
dubbo.registry.address dubbo注册地址 zookeeper://127.0.0.1:2181
rocketmq.name-server rocketmq server配置 127.0.0.1:9876
spring.redis.host redis 地址配置 127.0.0.1
spring.redis.password redis密码 qinwei
server.stat.outdate.seconds 长时间没有更新stat信息的server,视为无效。server信息过期时间 30
server.stat.outdate.keep 长时间没有更新stat信息的server,视为无效。是否继续保留server信息过期的连接 false

sona-web

key Description value
dubbo.registry.address dubbo注册地址 zookeeper://127.0.0.1:2181
spring.kafka.bootstrap-servers kafka server配置 127.0.0.1:9092
spring.redis.host redis 地址配置 127.0.0.1
spring.redis.password redis密码 qinwei
spring.datasource.url mysql连接地址 jdbc:mysql://127.0.0.1:3306/sona?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false
spring.datasource.username mysql用户名 root
spring.datasource.password mysql密码 qinwei123

sona-console

key Description value
dubbo.registry.address dubbo注册地址 zookeeper://127.0.0.1:2181
spring.kafka.bootstrap-servers kafka server配置 127.0.0.1:9092
spring.redis.host redis 地址配置 127.0.0.1
spring.redis.password redis密码 qinwei
spring.datasource.url mysql连接地址 jdbc:mysql://127.0.0.1:3306/sona?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false
spring.datasource.username mysql用户名 root
spring.datasource.password mysql密码 qinwei123
spring.elasticsearch.rest.uris es 地址 http://127.0.0.1:9200
spring.elasticsearch.rest.username es 用户名 elastic
spring.elasticsearch.rest.password es 密码 qinwei

数据库表结构

建表 sql 语句

MQ 消息格式

Kafka

  • 房间消息全链路日志

topic: TOPIC-ROOM-IM-MESSAGE-LOG

{
    "messageId": "消息id",
    "roomId": "房间id",
    "msgType": "消息类型",
    "productCode": "业务方",
    "priority": "消息优先级",
    "uid": "发送uid",
    "content": "消息内容",
    "toUid": "点对点消息指定的uids",
    "sendTime": "发送时间(毫秒时间戳)"  
}
  • 聊天室和群组消息保存

topic: TOPIC-ROOM-MESSAGE-RECORDER

{
    "messageId": "消息id",
    "roomId": "房间id",
    "productCode": "业务方",
    "uid": "发送uid",
    "content": "消息内容",
    "sendTime": "发送时间"
}
  • 聊天室中高、中、低等级消息处理

topic: TOPIC-ROOM-IM-MESSAGE

{
    "messageId": "消息id",
    "roomId": "房间id",
    "msgType": "消息类型",
    "productCode": "业务方",
    "priority": "消息优先级",
    "uid": "发送uid",
    "content": "消息内容",
    "needToSave": "消息是否保存",
    "ackUids":[],// ack用户
    "sendTime": "发送时间"
}
  • 聊天室需要 ack 的消息

topic: TOPIC_CHATROOM_MESSAGE_ACK

{
    "messageId": "消息id",
    "roomId": "房间id",
    "msgType": "消息类型",
    "productCode": "业务方",
    "priority": "消息优先级",
    "uid": "发送uid",
    "content": "消息内容",
    "needToSave": "消息是否保存",
    "ackUids":[],// ack用户
    "sendTime": "发送时间"
}
  • 处理客户端回复的 ack 消息

topic: TOPIC_ROOM_ACK_MESSAGE

{
    "messageId": "消息id",
    "roomId": "房间id",
    "messageType":"消息类型 106:ack",
    "uid": "发送uid"
}
  • 长链事件日志

topic: TOPIC-MERCURY_EVENT_LOG

{
    "uid": "uid",
    "server": "网关机器ip",
    "addr": "连接 ip:port",
    "type": "连接类型,2 聊天室",
    "device": "设备id",
    "event": "事件",
    "content": "内容",
    "cmd": "command",
    "header":"header 内容",
    "sendTime": "发送时间"
}
  • 客户端上报长链日志

topic: TOPIC-MERCURY_CLIENT_LOG

{
  "id": "",
  "sendTime": 0,  //毫秒时间戳
  "common": "",
  "desc": "",
  "details": "",
  "ip": "",
  "model": "",
  "network": "",
  "osVer": "",
  "platform": "",
  "type": "",
  "uid": ""
}

RocketMQ

  • 处理客户端通过长链发送的房间消息

topic: TOPIC_ROOM_MESSAGE

{
    "messageId": "消息id",
    "roomId": "房间id",
    "messageType":"消息类型 100:业务 101:文本 102:图片 103:表情 104:语音 105:视频 106:ack",
    "msgType": "消息类型",
    "productCode": "业务方",
    "priority": "消息优先级",
    "uid": "发送uid",
    "content": "消息内容",
    "needToSave": "消息是否保存"
}
  • 网关处理业务发送的房间消息

topic:TOPIC_CHATROOM_MESSAGE_SEND

{
    "data": "消息体",
    "room": "房间id",
    "cmd":"网关cmd命令",
    "messageId": "消息id",
    "highPriority": "是否高等级消息",
    "ackUids": "ack 用户"
}
  • 网关处理业务发送的群组消息

topic:TOPIC_GROUP_MESSAGE_SEND

{
    "data": "消息体",
    "room": "房间id",
    "cmd":"网关cmd命令",
    "messageId": "消息id",
    "channels": "需要发送的连接"
}
  • 房间连接在线状态

topic:TOPIC_SOCKET_ROOM_SESSION

{
    "tm": "时间戳",
    "channelId": "连接id",
    "cmd":"长链cmd 命令",
    "uid": "用户uid",
    "room": "房间id",
    "session": "session 状态",
    "t": "session type"
}
  • 处理长链房间在线状态

topic:TOPIC_CHATROOM_SESSION_SONA

{
    "timestamp": "时间戳",
    "room": "房间id",
    "cmd":"长链cmd 命令",
    "uid": "用户uid",
    "reason": "离开房间原因"
}
  • sona房间在线状态延迟检测

topic:TOPIC_SONA_MERCURY_OFFLINE

{
    "timestamp": "时间戳",
    "room": "房间id",
    "cmd":"长链cmd 命令",
    "uid": "用户uid",
    "reason": "离开房间原因"
}
  • 网关服务器状态统计

topic:TOPIC_SERVER_STATS

{
    "startTime": "网关启动时间",
    "serverId": "服务器ip",
    "authConn":"有效连接数",
    "unAuthConn": "无效连接数",
    "tm": "时间戳"
}

sona 消息格式

下发的房间消息格式统一如下:

{
    "data": {
      "biz":{},
       "":""
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 901,
    "roomId":""
}

sona 有很多内置的消息,比如加入房间、离开房间等会自动下发消息,msgType 已经固定好,业务方的type不能重复

  • 创建房间
{
    "data": {
      "uid":""
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10000,
    "roomId":""
}
  • 关闭房间
{
    "data": {
      "roomId":"",
      "uid":""
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10001,
    "roomId":""
}
  • 进入房间
{
    "data": {
      "roomId":"",
      "uid":""
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10002,
    "roomId":""
}
  • 离开房间
{
    "data": {
      "roomId":"",
      "uid":""
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10003,
    "roomId":""
}
  • 管理员设置/取消
{
    "data": {
      "roomId":"",
      "uid":"",
      "isAdmin":"0/1"
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10004,
    "roomId":""
}
  • 拉黑设置/取消
{
    "data": {
      "roomId":"",
      "uid":"",
      "isBlock":"0/1"
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10005,
    "roomId":""
}
  • 禁言设置/取消
{
    "data": {
      "roomId":"",
      "uid":"",
      "isMute":"0/1",
      "duration":"分钟"
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10006,
    "roomId":""
}
  • 踢人
{
    "data": {
      "roomId":"",
      "uid":""
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10007,
    "roomId":""
}
  • 云商热切
{
    "data": {
      "roomId":"",
      "pullMode":"",
       "pushMode":"",
       "supplier":"",
       "bitrate":"",
       "playerType":"",
       "streamId":"",
       "streamUrl":""
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10008,
    "roomId":""
}
  • 静音设置/取消
{
    "data": {
      "roomId":"",
      "isMute":"0/1",
      "streamList":""
    },
    "messageId": "79afe835-d55e-4d71-a155-76a1150f9447",
    "msgType": 10009,
    "roomId":""
}

全链路日志

为了方便排查问题,sona 实现了全链路日志,可以根据 traceId 快速查出一次请求所经过的所有服务链路

  • http web请求 ,配置 TraceFilter 即可, sona-web 中已添加无需额外配置
public class TraceFilter implements Filter {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpServletRequest = (HttpServletRequest) request;
        putParamFromHeader(TracerContext.TRACER_TRACE_ID, httpServletRequest);
        TraceHelper.init();
        try {
            chain.doFilter(request, response);
        } finally {
            TraceHelper.reset();
        }
    }

    private void putParamFromHeader(String key, HttpServletRequest request) {
        TracerContext context = TracerContext.getContext();
        String value = context.get(key);
        if (StringUtils.isEmpty(value)) {
            context.put(key, request.getHeader(key));
        }
    }

}
@Configuration
public class FilterConfig {

    @Bean
    public FilterRegistrationBean<TraceFilter> traceFilterRegistration() {
        FilterRegistrationBean<TraceFilter> registration = new FilterRegistrationBean<>();
        registration.setFilter(new TraceFilter());
        registration.addUrlPatterns("/*");
        registration.setName("traceFilter");
        registration.setOrder(Ordered.HIGHEST_PRECEDENCE);
        return registration;
    }

}
  • dubbo 接口调用自动开启了,无需额外配置

  • kafka 传递 traceid ,需要在 application.properties 文件添加配置,消费消息时统一使用 @KafkaListener 即可自动开启,各模块都已添加

spring.kafka.producer.properties.interceptor.classes=cn.bixin.sona.common.kafka.TraceProducerInterceptor
  • rocketmq 传递 traceid ,自动开启,无需额外配置, 消费消息必须要实现 RocketMQListener
@RocketMQMessageListener(topic = "topic-msg-test", consumerGroup = "topic-msg-test_group", messageModel = MessageModel.BROADCASTING)
@Component
@Slf4j
public class MessageConsumer implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        log.info("rocketmq recieve :{}", message);
    }
}
  • 业务上手动开启
TraceHelper.init();
try {
    // 业务处理
} finally {
    TraceHelper.reset();
}

Clone this wiki locally