Skip to content

Commit 5b698a8

Browse files
authored
Merge pull request #26 from crossoverJie/cim-1.0.2
cim 1.0.2
2 parents 61b008a + d30eeec commit 5b698a8

File tree

33 files changed

+599
-212
lines changed

33 files changed

+599
-212
lines changed

cim-client/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,6 @@
7272
<artifactId>logback-classic</artifactId>
7373
</dependency>
7474

75-
<dependency>
76-
<groupId>io.netty</groupId>
77-
<artifactId>netty-all</artifactId>
78-
<version>${netty.version}</version>
79-
</dependency>
80-
8175
<dependency>
8276
<groupId>junit</groupId>
8377
<artifactId>junit</artifactId>

cim-client/src/main/java/com/crossoverjie/cim/client/CIMClientApplication.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.crossoverjie.cim.client;
22

33
import com.crossoverjie.cim.client.scanner.Scan;
4+
import com.crossoverjie.cim.client.service.impl.ClientInfo;
45
import org.slf4j.Logger;
56
import org.slf4j.LoggerFactory;
7+
import org.springframework.beans.factory.annotation.Autowired;
68
import org.springframework.boot.CommandLineRunner;
79
import org.springframework.boot.SpringApplication;
810
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -15,6 +17,8 @@ public class CIMClientApplication implements CommandLineRunner{
1517

1618
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClientApplication.class);
1719

20+
@Autowired
21+
private ClientInfo clientInfo ;
1822
public static void main(String[] args) {
1923
SpringApplication.run(CIMClientApplication.class, args);
2024
LOGGER.info("启动 Client 服务成功");
@@ -26,5 +30,6 @@ public void run(String... args) throws Exception {
2630
Thread thread = new Thread(scan);
2731
thread.setName("scan-thread");
2832
thread.start();
33+
clientInfo.saveStartDate();
2934
}
3035
}

cim-client/src/main/java/com/crossoverjie/cim/client/client/CIMClient.java

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package com.crossoverjie.cim.client.client;
22

3+
import com.crossoverjie.cim.client.config.AppConfiguration;
34
import com.crossoverjie.cim.client.init.CIMClientHandleInitializer;
5+
import com.crossoverjie.cim.client.service.MsgHandle;
46
import com.crossoverjie.cim.client.service.RouteRequest;
7+
import com.crossoverjie.cim.client.service.impl.ClientInfo;
58
import com.crossoverjie.cim.client.vo.req.GoogleProtocolVO;
69
import com.crossoverjie.cim.client.vo.req.LoginReqVO;
710
import com.crossoverjie.cim.client.vo.res.CIMServerResVO;
@@ -16,6 +19,7 @@
1619
import io.netty.channel.nio.NioEventLoopGroup;
1720
import io.netty.channel.socket.SocketChannel;
1821
import io.netty.channel.socket.nio.NioSocketChannel;
22+
import io.netty.util.concurrent.DefaultThreadFactory;
1923
import org.slf4j.Logger;
2024
import org.slf4j.LoggerFactory;
2125
import org.springframework.beans.factory.annotation.Autowired;
@@ -28,15 +32,15 @@
2832
* Function:
2933
*
3034
* @author crossoverJie
31-
* Date: 22/05/2018 14:19
35+
* Date: 22/05/2018 14:19
3236
* @since JDK 1.8
3337
*/
3438
@Component
3539
public class CIMClient {
3640

3741
private final static Logger LOGGER = LoggerFactory.getLogger(CIMClient.class);
3842

39-
private EventLoopGroup group = new NioEventLoopGroup();
43+
private EventLoopGroup group = new NioEventLoopGroup(0, new DefaultThreadFactory("cim-work"));
4044

4145
@Value("${cim.user.id}")
4246
private long userId;
@@ -49,6 +53,20 @@ public class CIMClient {
4953
@Autowired
5054
private RouteRequest routeRequest;
5155

56+
@Autowired
57+
private AppConfiguration configuration;
58+
59+
@Autowired
60+
private MsgHandle msgHandle;
61+
62+
@Autowired
63+
private ClientInfo clientInfo;
64+
65+
/**
66+
* 重试次数
67+
*/
68+
private int errorCount;
69+
5270
@PostConstruct
5371
public void start() throws Exception {
5472

@@ -70,14 +88,25 @@ public void start() throws Exception {
7088
* @param cimServer
7189
* @throws InterruptedException
7290
*/
73-
private void startClient(CIMServerResVO.ServerInfo cimServer) throws InterruptedException {
91+
private void startClient(CIMServerResVO.ServerInfo cimServer) {
7492
Bootstrap bootstrap = new Bootstrap();
7593
bootstrap.group(group)
7694
.channel(NioSocketChannel.class)
7795
.handler(new CIMClientHandleInitializer())
7896
;
7997

80-
ChannelFuture future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
98+
ChannelFuture future = null;
99+
try {
100+
future = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
101+
} catch (InterruptedException e) {
102+
errorCount++;
103+
104+
if (errorCount >= configuration.getErrorCount()) {
105+
LOGGER.error("链接失败次数达到上限[{}]次", errorCount);
106+
msgHandle.shutdown();
107+
}
108+
LOGGER.error("连接失败", e);
109+
}
81110
if (future.isSuccess()) {
82111
LOGGER.info("启动 cim client 成功");
83112
}
@@ -90,10 +119,26 @@ private void startClient(CIMServerResVO.ServerInfo cimServer) throws Interrupted
90119
* @return 路由服务器信息
91120
* @throws Exception
92121
*/
93-
private CIMServerResVO.ServerInfo userLogin() throws Exception {
122+
private CIMServerResVO.ServerInfo userLogin() {
94123
LoginReqVO loginReqVO = new LoginReqVO(userId, userName);
95-
CIMServerResVO.ServerInfo cimServer = routeRequest.getCIMServer(loginReqVO);
96-
LOGGER.info("cimServer=[{}]", cimServer.toString());
124+
CIMServerResVO.ServerInfo cimServer = null;
125+
try {
126+
cimServer = routeRequest.getCIMServer(loginReqVO);
127+
128+
//保存系统信息
129+
clientInfo.saveServiceInfo(cimServer.getIp() + ":" + cimServer.getCimServerPort())
130+
.saveUserInfo(userId, userName);
131+
132+
LOGGER.info("cimServer=[{}]", cimServer.toString());
133+
} catch (Exception e) {
134+
errorCount++;
135+
136+
if (errorCount >= configuration.getErrorCount()) {
137+
LOGGER.error("重连次数达到上限[{}]次", errorCount);
138+
msgHandle.shutdown();
139+
}
140+
LOGGER.error("登录失败", e);
141+
}
97142
return cimServer;
98143
}
99144

@@ -145,11 +190,25 @@ public void sendGoogleProtocolMsg(GoogleProtocolVO googleProtocolVO) {
145190

146191
}
147192

193+
194+
public void reconnect() throws Exception {
195+
if (channel != null && channel.isActive()) {
196+
return;
197+
}
198+
//首先清除路由信息,下线
199+
routeRequest.offLine();
200+
201+
LOGGER.info("开始重连。。");
202+
start();
203+
LOGGER.info("重连成功!!");
204+
}
205+
148206
/**
149207
* 关闭
208+
*
150209
* @throws InterruptedException
151210
*/
152211
public void close() throws InterruptedException {
153-
channel.close() ;
212+
channel.close();
154213
}
155214
}

cim-client/src/main/java/com/crossoverjie/cim/client/config/AppConfiguration.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ public class AppConfiguration {
2222
@Value("${cim.msg.logger.path}")
2323
private String msgLoggerPath ;
2424

25+
@Value("${cim.clear.route.request.url}")
26+
private String clearRouteUrl ;
27+
28+
@Value("${cim.heartbeat.time}")
29+
private long heartBeatTime ;
30+
31+
@Value("${cim.reconnect.count}")
32+
private int errorCount ;
33+
2534
public Long getUserId() {
2635
return userId;
2736
}
@@ -45,4 +54,30 @@ public String getMsgLoggerPath() {
4554
public void setMsgLoggerPath(String msgLoggerPath) {
4655
this.msgLoggerPath = msgLoggerPath;
4756
}
57+
58+
59+
public long getHeartBeatTime() {
60+
return heartBeatTime;
61+
}
62+
63+
public void setHeartBeatTime(long heartBeatTime) {
64+
this.heartBeatTime = heartBeatTime;
65+
}
66+
67+
68+
public String getClearRouteUrl() {
69+
return clearRouteUrl;
70+
}
71+
72+
public void setClearRouteUrl(String clearRouteUrl) {
73+
this.clearRouteUrl = clearRouteUrl;
74+
}
75+
76+
public int getErrorCount() {
77+
return errorCount;
78+
}
79+
80+
public void setErrorCount(int errorCount) {
81+
this.errorCount = errorCount;
82+
}
4883
}

cim-client/src/main/java/com/crossoverjie/cim/client/config/BeanConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,17 @@ public ThreadPoolExecutor buildCallerThread(){
8282
return productExecutor ;
8383
}
8484

85+
86+
@Bean("scheduledTask")
87+
public ScheduledExecutorService buildSchedule(){
88+
ThreadFactory sche = new ThreadFactoryBuilder()
89+
.setNameFormat("scheduled-%d")
90+
.setDaemon(true)
91+
.build();
92+
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,sche) ;
93+
return scheduledExecutorService ;
94+
}
95+
8596
/**
8697
* 回调 bean
8798
* @return

cim-client/src/main/java/com/crossoverjie/cim/client/handle/CIMClientHandle.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.crossoverjie.cim.client.handle;
22

3+
import com.crossoverjie.cim.client.thread.ReConnectJob;
34
import com.crossoverjie.cim.client.util.SpringBeanFactory;
45
import com.crossoverjie.cim.common.constant.Constants;
56
import com.crossoverjie.cim.common.protocol.CIMRequestProto;
67
import com.crossoverjie.cim.common.protocol.CIMResponseProto;
8+
import com.crossoverjie.cim.common.util.NettyAttrUtil;
79
import io.netty.channel.ChannelFutureListener;
810
import io.netty.channel.ChannelHandler;
911
import io.netty.channel.ChannelHandlerContext;
@@ -13,7 +15,9 @@
1315
import org.slf4j.Logger;
1416
import org.slf4j.LoggerFactory;
1517

18+
import java.util.concurrent.ScheduledExecutorService;
1619
import java.util.concurrent.ThreadPoolExecutor;
20+
import java.util.concurrent.TimeUnit;
1721

1822
/**
1923
* Function:
@@ -31,20 +35,28 @@ public class CIMClientHandle extends SimpleChannelInboundHandler<CIMResponseProt
3135

3236
private ThreadPoolExecutor threadPoolExecutor ;
3337

38+
private ScheduledExecutorService scheduledExecutorService ;
39+
3440

3541
@Override
3642
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
3743

3844
if (evt instanceof IdleStateEvent){
3945
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
4046

47+
//LOGGER.info("定时检测服务端是否存活");
48+
4149
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
4250
CIMRequestProto.CIMReqProtocol heartBeat = SpringBeanFactory.getBean("heartBeat",
4351
CIMRequestProto.CIMReqProtocol.class);
44-
ctx.writeAndFlush(heartBeat).addListeners(ChannelFutureListener.CLOSE_ON_FAILURE) ;
52+
ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> {
53+
if (!future.isSuccess()) {
54+
LOGGER.error("IO error,close Channel");
55+
future.channel().close();
56+
}
57+
}) ;
4558
}
4659

47-
4860
}
4961

5062
super.userEventTriggered(ctx, evt);
@@ -58,10 +70,24 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
5870
}
5971

6072
@Override
61-
protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResponseProto.CIMResProtocol msg) throws Exception {
73+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
74+
LOGGER.info("客户端断开了,重新连接!");
6275

63-
//从服务端收到消息时被调用
64-
//LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
76+
if (scheduledExecutorService == null){
77+
scheduledExecutorService = SpringBeanFactory.getBean("scheduledTask",ScheduledExecutorService.class) ;
78+
}
79+
// TODO: 2019-01-22 后期可以改为不用定时任务,连上后就关闭任务 节省性能。
80+
scheduledExecutorService.scheduleAtFixedRate(new ReConnectJob(ctx),0,10, TimeUnit.SECONDS) ;
81+
}
82+
83+
@Override
84+
protected void channelRead0(ChannelHandlerContext ctx, CIMResponseProto.CIMResProtocol msg) throws Exception {
85+
86+
//心跳更新时间
87+
if (msg.getType() == Constants.CommandType.PING){
88+
//LOGGER.info("收到服务端心跳!!!");
89+
NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis());
90+
}
6591

6692
if (msg.getType() != Constants.CommandType.PING) {
6793
//回调消息
@@ -70,6 +96,10 @@ protected void channelRead0(ChannelHandlerContext channelHandlerContext, CIMResp
7096
LOGGER.info(msg.getResMsg());
7197
}
7298

99+
100+
101+
102+
73103
}
74104

75105
/**

cim-client/src/main/java/com/crossoverjie/cim/client/init/CIMClientHandleInitializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public class CIMClientHandleInitializer extends ChannelInitializer<Channel> {
2424
@Override
2525
protected void initChannel(Channel ch) throws Exception {
2626
ch.pipeline()
27-
//30 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
28-
.addLast(new IdleStateHandler(0, 30, 0))
27+
//10 秒没发送消息 将IdleStateHandler 添加到 ChannelPipeline 中
28+
.addLast(new IdleStateHandler(0, 10, 0))
2929

3030
//心跳解码
3131
//.addLast(new HeartbeatEncode())

cim-client/src/main/java/com/crossoverjie/cim/client/service/MsgHandle.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,10 @@ public interface MsgHandle {
4848
* @return 是否应当跳过当前消息(包含了":" 就需要跳过)
4949
*/
5050
boolean innerCommand(String msg) ;
51+
52+
53+
/**
54+
* 关闭系统
55+
*/
56+
void shutdown() ;
5157
}

cim-client/src/main/java/com/crossoverjie/cim/client/service/RouteRequest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,13 @@ public interface RouteRequest {
4141
CIMServerResVO.ServerInfo getCIMServer(LoginReqVO loginReqVO) throws Exception;
4242

4343
/**
44-
*
45-
* @return 获取所有在线用户
44+
* 获取所有在线用户
45+
* @return
46+
* @throws Exception
4647
*/
4748
List<OnlineUsersResVO.DataBodyBean> onlineUsers()throws Exception ;
4849

4950

51+
void offLine() ;
52+
5053
}

0 commit comments

Comments
 (0)