22
33import arthas .grpc .unittest .ArthasUnittest ;
44import arthas .grpc .unittest .ArthasUnittestServiceGrpc ;
5+ import ch .qos .logback .classic .Level ;
6+ import ch .qos .logback .classic .Logger ;
7+ import ch .qos .logback .classic .LoggerContext ;
58import com .taobao .arthas .grpc .server .ArthasGrpcServer ;
69import io .grpc .ManagedChannel ;
710import io .grpc .ManagedChannelBuilder ;
8- import io .grpc .StatusRuntimeException ;
911import io .grpc .stub .StreamObserver ;
1012import org .junit .Assert ;
1113import org .junit .Before ;
1214import org .junit .Test ;
13- import org .junit .jupiter .api .BeforeAll ;
14- import org .junit .jupiter .api .BeforeEach ;
15- import org .junit .jupiter .api .Disabled ;
15+ import org .slf4j .LoggerFactory ;
1616
17+ import java .lang .invoke .MethodHandles ;
1718import java .util .Random ;
1819import java .util .concurrent .CountDownLatch ;
1920import java .util .concurrent .ExecutorService ;
2021import java .util .concurrent .Executors ;
22+ import java .util .concurrent .TimeUnit ;
2123import java .util .concurrent .atomic .AtomicInteger ;
2224
2325/**
2729 */
2830public class GrpcTest {
2931 private static final String HOST = "localhost" ;
30- private static final int PORT = 9090 ;
32+ private static final int PORT = 9092 ;
3133 private static final String HOST_PORT = HOST + ":" + PORT ;
3234 private static final String UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME = "unittest.grpc.service.impl" ;
33- private ArthasUnittestServiceGrpc .ArthasUnittestServiceBlockingStub blockingStub = null ;
35+ private static final Logger log = (Logger ) LoggerFactory .getLogger (GrpcTest .class );
36+ private ManagedChannel clientChannel ;
3437 Random random = new Random ();
3538 ExecutorService threadPool = Executors .newFixedThreadPool (10 );
3639
40+
3741 @ Before
3842 public void startServer () {
43+ LoggerContext loggerContext = (LoggerContext ) LoggerFactory .getILoggerFactory ();
44+ Logger rootLogger = loggerContext .getLogger ("ROOT" );
45+
46+ rootLogger .setLevel (Level .INFO );
47+
3948 Thread grpcWebProxyStart = new Thread (() -> {
4049 ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer (PORT , UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME );
4150 arthasGrpcServer .start ();
4251 });
4352 grpcWebProxyStart .start ();
53+
54+ clientChannel = ManagedChannelBuilder .forTarget (HOST_PORT )
55+ .usePlaintext ()
56+ .build ();
4457 }
4558
4659 @ Test
4760 public void testUnary () {
48- ManagedChannel channel = ManagedChannelBuilder .forTarget (HOST_PORT )
49- .usePlaintext ()
50- .build ();
61+ log .info ("testUnary start!" );
5162
52- ArthasUnittestServiceGrpc .ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc .newBlockingStub (channel );
63+
64+ ArthasUnittestServiceGrpc .ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc .newBlockingStub (clientChannel );
5365
5466 try {
5567 ArthasUnittest .ArthasUnittestRequest request = ArthasUnittest .ArthasUnittestRequest .newBuilder ().setMessage ("unaryInvoke" ).build ();
5668 ArthasUnittest .ArthasUnittestResponse res = stub .unary (request );
5769 System .out .println (res .getMessage ());
5870 } finally {
59- channel .shutdownNow ();
71+ clientChannel .shutdownNow ();
6072 }
73+ log .info ("testUnary success!" );
6174 }
6275
6376 @ Test
6477 public void testUnarySum () throws InterruptedException {
65- ManagedChannel channel = ManagedChannelBuilder .forTarget (HOST_PORT )
66- .usePlaintext ()
67- .build ();
78+ log .info ("testUnarySum start!" );
6879
69- ArthasUnittestServiceGrpc .ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc .newBlockingStub (channel );
80+ ArthasUnittestServiceGrpc .ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc .newBlockingStub (clientChannel );
7081 for (int i = 0 ; i < 10 ; i ++) {
7182 AtomicInteger sum = new AtomicInteger (0 );
7283 int finalId = i ;
73- for (int j = 0 ; j < 100 ; j ++) {
84+ for (int j = 0 ; j < 10 ; j ++) {
7485 int num = random .nextInt (101 );
7586 sum .addAndGet (num );
7687 threadPool .submit (() -> {
@@ -82,17 +93,16 @@ public void testUnarySum() throws InterruptedException {
8293 System .out .println ("id:" + finalId + ",sum:" + sum .get () + ",grpcSum:" + grpcSum );
8394 Assert .assertEquals (sum .get (), grpcSum );
8495 }
85- channel .shutdown ();
96+ clientChannel .shutdown ();
97+ log .info ("testUnarySum success!" );
8698 }
8799
88100 // 用于测试客户端流
89101 @ Test
90102 public void testClientStreamSum () throws Throwable {
91- ManagedChannel channel = ManagedChannelBuilder .forAddress ("localhost" , 9090 )
92- .usePlaintext ()
93- .build ();
103+ log .info ("testClientStreamSum start!" );
94104
95- ArthasUnittestServiceGrpc .ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc .newStub (channel );
105+ ArthasUnittestServiceGrpc .ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc .newStub (clientChannel );
96106
97107 AtomicInteger sum = new AtomicInteger (0 );
98108 CountDownLatch latch = new CountDownLatch (1 );
@@ -115,25 +125,24 @@ public void onCompleted() {
115125 }
116126 });
117127
118- for (int j = 0 ; j < 1000 ; j ++) {
128+ for (int j = 0 ; j < 100 ; j ++) {
119129 int num = random .nextInt (1001 );
120130 sum .addAndGet (num );
121131 clientStreamObserver .onNext (ArthasUnittest .ArthasUnittestRequest .newBuilder ().setNum (num ).build ());
122132 }
123133
124134 clientStreamObserver .onCompleted ();
125- latch .await ();
126- channel .shutdown ();
135+ latch .await (20 ,TimeUnit .SECONDS );
136+ clientChannel .shutdown ();
137+ log .info ("testClientStreamSum success!" );
127138 }
128139
129140 // 用于测试请求数据隔离性
130141 @ Test
131142 public void testDataIsolation () throws InterruptedException {
132- ManagedChannel channel = ManagedChannelBuilder .forAddress ("localhost" , 9090 )
133- .usePlaintext ()
134- .build ();
143+ log .info ("testDataIsolation start!" );
135144
136- ArthasUnittestServiceGrpc .ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc .newStub (channel );
145+ ArthasUnittestServiceGrpc .ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc .newStub (clientChannel );
137146 for (int i = 0 ; i < 10 ; i ++) {
138147 threadPool .submit (() -> {
139148 AtomicInteger sum = new AtomicInteger (0 );
@@ -170,23 +179,22 @@ public void onCompleted() {
170179
171180 clientStreamObserver .onCompleted ();
172181 try {
173- latch .await ();
182+ latch .await (20 , TimeUnit . SECONDS );
174183 } catch (InterruptedException e ) {
175184 throw new RuntimeException (e );
176185 }
177- channel .shutdown ();
186+ clientChannel .shutdown ();
178187 });
179188 }
180- Thread .sleep (7000L );
189+ Thread .sleep (10000L );
190+ log .info ("testDataIsolation success!" );
181191 }
182192
183193 @ Test
184194 public void testServerStream () throws InterruptedException {
185- ManagedChannel channel = ManagedChannelBuilder .forAddress ("localhost" , 9090 )
186- .usePlaintext ()
187- .build ();
195+ log .info ("testServerStream start!" );
188196
189- ArthasUnittestServiceGrpc .ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc .newStub (channel );
197+ ArthasUnittestServiceGrpc .ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc .newStub (clientChannel );
190198
191199 ArthasUnittest .ArthasUnittestRequest request = ArthasUnittest .ArthasUnittestRequest .newBuilder ().setMessage ("serverStream" ).build ();
192200
@@ -211,18 +219,17 @@ public void onCompleted() {
211219 } catch (InterruptedException e ) {
212220 e .printStackTrace ();
213221 } finally {
214- channel .shutdown ();
222+ clientChannel .shutdown ();
215223 }
224+ log .info ("testServerStream success!" );
216225 }
217226
218227 // 用于测试双向流
219228 @ Test
220229 public void testBiStream () throws Throwable {
221- ManagedChannel channel = ManagedChannelBuilder .forAddress ("localhost" , 9090 )
222- .usePlaintext ()
223- .build ();
230+ log .info ("testBiStream start!" );
224231
225- ArthasUnittestServiceGrpc .ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc .newStub (channel );
232+ ArthasUnittestServiceGrpc .ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc .newStub (clientChannel );
226233
227234 CountDownLatch latch = new CountDownLatch (1 );
228235 StreamObserver <ArthasUnittest .ArthasUnittestRequest > biStreamObserver = stub .biStream (new StreamObserver <ArthasUnittest .ArthasUnittestResponse >() {
@@ -251,8 +258,9 @@ public void onCompleted() {
251258
252259 Thread .sleep (2000 );
253260 biStreamObserver .onCompleted ();
254- latch .await ();
255- channel .shutdown ();
261+ latch .await (20 , TimeUnit .SECONDS );
262+ clientChannel .shutdown ();
263+ log .info ("testBiStream success!" );
256264 }
257265
258266 private void addSum (ArthasUnittestServiceGrpc .ArthasUnittestServiceBlockingStub stub , int id , int num ) {
0 commit comments