Skip to content

Commit 6c4c8fa

Browse files
committed
✨ Introducing new features. 管道通信
1 parent 0f422d7 commit 6c4c8fa

File tree

2 files changed

+186
-12
lines changed

2 files changed

+186
-12
lines changed

MD/concurrent/thread-communication.md

+116-1
Original file line numberDiff line numberDiff line change
@@ -327,4 +327,119 @@ public class StopThread implements Runnable {
327327

328328
## 线程池 awaitTermination() 方法
329329

330-
## 管道通信
330+
如果是用线程池来管理线程,可以使用以下方式来让主线程等待线程池中所有任务执行完毕:
331+
332+
```java
333+
private static void executorService() throws Exception{
334+
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10) ;
335+
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;
336+
poolExecutor.execute(new Runnable() {
337+
@Override
338+
public void run() {
339+
LOGGER.info("running");
340+
try {
341+
Thread.sleep(3000);
342+
} catch (InterruptedException e) {
343+
e.printStackTrace();
344+
}
345+
}
346+
});
347+
poolExecutor.execute(new Runnable() {
348+
@Override
349+
public void run() {
350+
LOGGER.info("running2");
351+
try {
352+
Thread.sleep(2000);
353+
} catch (InterruptedException e) {
354+
e.printStackTrace();
355+
}
356+
}
357+
});
358+
359+
poolExecutor.shutdown();
360+
while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
361+
LOGGER.info("线程还在执行。。。");
362+
}
363+
LOGGER.info("main over");
364+
}
365+
```
366+
367+
使用这个 `awaitTermination()` 方法的前提需要关闭线程池,如调用了 `shutdown()` 方法。
368+
369+
调用了 `shutdown()` 之后线程池会停止接受新任务,并且会平滑的关闭线程池中现有的任务。
370+
371+
372+
## 管道通信
373+
374+
```java
375+
public static void piped() throws IOException {
376+
//面向于字符 PipedInputStream 面向于字节
377+
PipedWriter writer = new PipedWriter();
378+
PipedReader reader = new PipedReader();
379+
380+
//输入输出流建立连接
381+
writer.connect(reader);
382+
383+
384+
Thread t1 = new Thread(new Runnable() {
385+
@Override
386+
public void run() {
387+
LOGGER.info("running");
388+
try {
389+
for (int i = 0; i < 10; i++) {
390+
391+
writer.write(i+"");
392+
Thread.sleep(10);
393+
}
394+
} catch (Exception e) {
395+
396+
} finally {
397+
try {
398+
writer.close();
399+
} catch (IOException e) {
400+
e.printStackTrace();
401+
}
402+
}
403+
404+
}
405+
});
406+
Thread t2 = new Thread(new Runnable() {
407+
@Override
408+
public void run() {
409+
LOGGER.info("running2");
410+
int msg = 0;
411+
try {
412+
while ((msg = reader.read()) != -1) {
413+
LOGGER.info("msg={}", (char) msg);
414+
}
415+
416+
} catch (Exception e) {
417+
418+
}
419+
}
420+
});
421+
t1.start();
422+
t2.start();
423+
}
424+
```
425+
426+
输出结果:
427+
428+
```
429+
2018-03-16 19:56:43.014 [Thread-0] INFO c.c.actual.ThreadCommunication - running
430+
2018-03-16 19:56:43.014 [Thread-1] INFO c.c.actual.ThreadCommunication - running2
431+
2018-03-16 19:56:43.130 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=0
432+
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=1
433+
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=2
434+
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=3
435+
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=4
436+
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=5
437+
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=6
438+
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=7
439+
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=8
440+
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=9
441+
```
442+
443+
Java 虽说是基于内存通信的,但也可以使用管道通信。
444+
445+
需要注意的是,输入流和输出流需要首先建立连接。这样线程 B 就可以收到线程 A 发出的消息了。

src/main/java/com/crossoverjie/actual/ThreadCommunication.java

+70-11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
55

6+
import java.io.IOException;
7+
import java.io.PipedInputStream;
8+
import java.io.PipedReader;
9+
import java.io.PipedWriter;
610
import java.util.concurrent.*;
711

812
/**
@@ -14,22 +18,25 @@
1418
*/
1519
public class ThreadCommunication {
1620
private final static Logger LOGGER = LoggerFactory.getLogger(ThreadCommunication.class);
21+
1722
public static void main(String[] args) throws Exception {
1823
//join();
1924
//executorService();
20-
countDownLatch();
25+
//countDownLatch();
26+
piped();
2127

2228
}
2329

2430
/**
2531
* 使用countDownLatch 每执行完一个就减一,最后等待全部完成
32+
*
2633
* @throws Exception
2734
*/
28-
private static void countDownLatch() throws Exception{
29-
int thread = 3 ;
35+
private static void countDownLatch() throws Exception {
36+
int thread = 3;
3037
long start = System.currentTimeMillis();
3138
final CountDownLatch countDown = new CountDownLatch(thread);
32-
for (int i= 0 ;i<thread ; i++){
39+
for (int i = 0; i < thread; i++) {
3340
new Thread(new Runnable() {
3441
@Override
3542
public void run() {
@@ -48,16 +55,17 @@ public void run() {
4855

4956
countDown.await();
5057
long stop = System.currentTimeMillis();
51-
LOGGER.info("main over total time={}",stop-start);
58+
LOGGER.info("main over total time={}", stop - start);
5259
}
5360

5461
/**
5562
* 利用线程池的 awaitTermination 方法,每隔一秒钟检查线程池是否执行完毕
63+
*
5664
* @throws Exception
5765
*/
58-
private static void executorService() throws Exception{
59-
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10) ;
60-
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;
66+
private static void executorService() throws Exception {
67+
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10);
68+
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MILLISECONDS, queue);
6169
poolExecutor.execute(new Runnable() {
6270
@Override
6371
public void run() {
@@ -82,7 +90,7 @@ public void run() {
8290
});
8391

8492
poolExecutor.shutdown();
85-
while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
93+
while (!poolExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
8694
LOGGER.info("线程还在执行。。。");
8795
}
8896
LOGGER.info("main over");
@@ -91,6 +99,7 @@ public void run() {
9199

92100
/**
93101
* 采用 join 线程间通信
102+
*
94103
* @throws InterruptedException
95104
*/
96105
private static void join() throws InterruptedException {
@@ -104,7 +113,7 @@ public void run() {
104113
e.printStackTrace();
105114
}
106115
}
107-
}) ;
116+
});
108117
Thread t2 = new Thread(new Runnable() {
109118
@Override
110119
public void run() {
@@ -115,7 +124,7 @@ public void run() {
115124
e.printStackTrace();
116125
}
117126
}
118-
}) ;
127+
});
119128

120129
t1.start();
121130
t2.start();
@@ -128,4 +137,54 @@ public void run() {
128137

129138
LOGGER.info("main over");
130139
}
140+
141+
public static void piped() throws IOException {
142+
//面向于字符 PipedInputStream 面向于字节
143+
PipedWriter writer = new PipedWriter();
144+
PipedReader reader = new PipedReader();
145+
146+
//输入输出流建立连接
147+
writer.connect(reader);
148+
149+
150+
Thread t1 = new Thread(new Runnable() {
151+
@Override
152+
public void run() {
153+
LOGGER.info("running");
154+
try {
155+
for (int i = 0; i < 10; i++) {
156+
157+
writer.write(i+"");
158+
Thread.sleep(10);
159+
}
160+
} catch (Exception e) {
161+
162+
} finally {
163+
try {
164+
writer.close();
165+
} catch (IOException e) {
166+
e.printStackTrace();
167+
}
168+
}
169+
170+
}
171+
});
172+
Thread t2 = new Thread(new Runnable() {
173+
@Override
174+
public void run() {
175+
LOGGER.info("running2");
176+
int msg = 0;
177+
try {
178+
while ((msg = reader.read()) != -1) {
179+
LOGGER.info("msg={}", (char) msg);
180+
}
181+
182+
} catch (Exception e) {
183+
184+
}
185+
}
186+
});
187+
t1.start();
188+
t2.start();
189+
}
131190
}

0 commit comments

Comments
 (0)