1+ package fucking .concurrency .demo ;
2+
3+ import java .util .ArrayList ;
4+ import java .util .List ;
5+ import java .util .concurrent .CompletableFuture ;
6+ import java .util .concurrent .Future ;
7+ import java .util .concurrent .LinkedBlockingQueue ;
8+ import java .util .concurrent .ThreadPoolExecutor ;
9+ import java .util .concurrent .TimeUnit ;
10+
11+ import static java .util .concurrent .CompletableFuture .supplyAsync ;
12+ /**
13+ * @author Eric Lin (linqinghua4 at gmail dot com)
14+ */
15+ public class CyclicThreadPoolDeadLockDemo {
16+ public static void main (String [] args ) throws InterruptedException {
17+ if (args .length > 0 && "good" .equals (args [0 ])) {
18+ goodCase ();
19+ } else {
20+ badCase ();
21+ }
22+ }
23+
24+ static void badCase () throws InterruptedException {
25+ int poolSize = 16 ;
26+ ThreadPoolExecutor pool1 = new ThreadPoolExecutor (poolSize , poolSize ,
27+ 0L , TimeUnit .MILLISECONDS ,
28+ new LinkedBlockingQueue <>());
29+ ThreadPoolExecutor pool2 = new ThreadPoolExecutor (poolSize , poolSize ,
30+ 0L , TimeUnit .MILLISECONDS ,
31+ new LinkedBlockingQueue <>());
32+ List <Future <Integer >> futures = new ArrayList <>();
33+ for (int i = 0 ; i < 100 ; i ++) {
34+ int finalI = i ;
35+ Future <Integer > future = pool1 .submit (() -> {
36+ System .out .println ("step1, i = " + finalI );
37+ return 1 + getUnchecked (pool2 .submit (() -> {
38+ System .out .println ("step2, i = " + finalI );
39+ return 2 + getUnchecked (pool1 .submit (() -> {
40+ System .out .println ("step3, i = " + finalI );
41+ return 3 ;
42+ }));
43+ }));
44+ });
45+ futures .add (future );
46+ }
47+ // 无法计算,死锁
48+ int result = futures .stream ()
49+ .mapToInt (CyclicThreadPoolDeadLockDemo ::getUnchecked )
50+ .sum ();
51+ System .out .println ("result = " + result );
52+ // 无法关闭
53+ pool1 .awaitTermination (20 , TimeUnit .SECONDS );
54+ pool2 .awaitTermination (20 , TimeUnit .SECONDS );
55+ }
56+
57+ static void goodCase () throws InterruptedException {
58+ int poolSize = 16 ;
59+ ThreadPoolExecutor pool1 = new ThreadPoolExecutor (poolSize , poolSize ,
60+ 0L , TimeUnit .MILLISECONDS ,
61+ new LinkedBlockingQueue <>());
62+ ThreadPoolExecutor pool2 = new ThreadPoolExecutor (poolSize , poolSize ,
63+ 0L , TimeUnit .MILLISECONDS ,
64+ new LinkedBlockingQueue <>());
65+ List <Future <Integer >> futures = new ArrayList <>();
66+ for (int i = 0 ; i < 100 ; i ++) {
67+ int finalI = i ;
68+ CompletableFuture <Integer > cf1 = supplyAsync (() -> {
69+ System .out .println ("step1, i = " + finalI );
70+ return 1 ;
71+ }, pool1 );
72+ CompletableFuture <Integer > cf2 = supplyAsync (() -> {
73+ System .out .println ("step2, i = " + finalI );
74+ return 2 ;
75+ }, pool2 );
76+ CompletableFuture <Integer > cf3 = supplyAsync (() -> {
77+ System .out .println ("step3, i = " + finalI );
78+ return 3 ;
79+ }, pool1 );
80+ Future <Integer > future =
81+ cf1 .thenComposeAsync (x ->
82+ cf2 .thenComposeAsync (y ->
83+ cf3 .thenApply (z ->
84+ x + y + z ), pool2 ), pool1 );
85+ futures .add (future );
86+ }
87+ System .out .println ("size1 = " + pool1 .getQueue ().size ());
88+ System .out .println ("size2 = " + pool2 .getQueue ().size ());
89+ int result = futures .stream ()
90+ .mapToInt (CyclicThreadPoolDeadLockDemo ::getUnchecked )
91+ .sum ();
92+ System .out .println ("result = " + result );
93+ pool1 .awaitTermination (20 , TimeUnit .SECONDS );
94+ pool2 .awaitTermination (20 , TimeUnit .SECONDS );
95+ }
96+
97+ static <T > T getUnchecked (Future <T > future ) {
98+ try {
99+ return future .get ();
100+ } catch (Exception e ) {
101+ throw new RuntimeException (e );
102+ }
103+ }
104+ }
0 commit comments