diff --git a/README.md b/README.md
index 1ce6f93..29f4957 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,7 @@
-内容整理自[《Java并发编程实战》](https://time.geekbang.org/column/intro/159)
+内容整理自[《Java并发编程实战》](https://time.geekbang.org/column/intro/159)[《java-concurrency-patterns》](https://github.com/LeonardoZ/java-concurrency-patterns)
相关工具类[《vjtools》](https://github.com/vipshop/vjtools)
-WeChat:Zed-RD [](https://travis-ci.org/Fadezed/concurrency)
+
+ [](https://travis-ci.org/Fadezed/concurrency)
@@ -39,20 +40,18 @@ WeChat:Zed-RD [
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/visibility/Visibility.java)


## 线程切换带来的原子性问题
* 一个或者多个操作在 CPU 执行的过程中不被中断的特性称为**原子性**
-
* **时间片**概念
* 线程切换 ---〉提升cpu利用率。 tips:Unix系统因支持多进程分时复用而出名。
-* 线程切换[代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/contentSwitch/ContentSwitchTest.java)。
-* 原子问题[代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/atomic/AtomicCounter.java)
+* 线程切换[代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/contentswitch/ContentSwitchTest.java)。
+* 原子问题[代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/atomic/AtomicCounter.java)

* count+=1 操作分析
@@ -94,7 +93,7 @@ public class Singleton {
## 按需禁用缓存以及编译优化 [代码来源](http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html)
* ## volatile
- * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/volatileExample/VolatileExample.java)
+ * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/volatilecase/VolatileExample.java)
* ## synchronized
@@ -134,9 +133,9 @@ public class Singleton {
* 偏向锁
* 轻量级锁
* 重量级锁
-
- * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/synchronizedEx/SynchronizedExample.java)
+
+ * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedExample.java)
```
class X {
@@ -158,7 +157,7 @@ public class Singleton {
## final
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/finalEx/FinalExample.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/finalcase/FinalExample.java)
* 修饰变量时,初衷是告诉编译器:这个变量生而不变,非immutable,即只能表示对象引用不能被赋值(例如List);
* 修饰方法则方法不能被重写
* 修饰类则不能被扩展继承。
@@ -238,7 +237,7 @@ public class Singleton {
-------
# 4.JAVA线程的生命周期
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/threadState/ThreadState.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/threadstate/ThreadState.java)
## 通用的线程生命周期
* 初始状态
@@ -484,7 +483,7 @@ class Semaphore{
### 使用方法
#### 实现互斥
#### 实现限流器(Semaphore 可以允许多个线程访问一个临界区)
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/semaphore/SemaphoreEx.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/semaphore/SemaphoreEx.java)
-------
@@ -495,16 +494,16 @@ class Semaphore{
* 只允许一个线程写共享变量
* 如果一个写线程正在执行写操作,此时禁止读线程读共享变量
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/readWriteLock/CacheByReadWriteLock.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/readwritelock/CacheByReadWriteLock.java)
## StampedLock 加上乐观读(无锁)
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/readWriteLock/StampedLockEx.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/readwritelock/StampedLockEx.java)
## CountDownLatch
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/countDownLatchEx/CountDownLatchEx.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java)
## CyclicBarrier
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/cyclicBarrierEx/CyclicBarrierEx.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierEx.java)
@@ -581,7 +580,7 @@ CopyOnWriteArraySet、ConcurrentSkipListSet
# 11. 原子类
* 无锁方案实现原理(Compare And Swap)
- * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/atomic/SimulatedCAS.java)
+ * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/atomic/SimulatedCompareAndSwap.java)
* 概览图

@@ -640,7 +639,7 @@ accumulateAndGet(x,func)
* 而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了。
## 线程池是一种生产者-消费者模式(非一般意义池化资源)
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/threadPool/MyThreadPool.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/threadPool/MyThreadPool.java)
* Java ThreadPoolExecutor
```
@@ -713,7 +712,7 @@ get(long timeout, TimeUnit unit);
## FutureTask工具类(实现了RunnableFuture而它继承了Runnable和Future接口)
-* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/futureTask/FutureTaskEx.java)
+* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/futuretask/FutureTaskEx.java)
* 构造函数类似线程池submit
```
diff --git a/src/main/java/com/example/concurrency/features/atomic/SimulatedCAS.java b/src/main/java/com/example/concurrency/features/atomic/SimulatedCompareAndSwap.java
similarity index 91%
rename from src/main/java/com/example/concurrency/features/atomic/SimulatedCAS.java
rename to src/main/java/com/example/concurrency/features/atomic/SimulatedCompareAndSwap.java
index 41afaae..a3345c6 100644
--- a/src/main/java/com/example/concurrency/features/atomic/SimulatedCAS.java
+++ b/src/main/java/com/example/concurrency/features/atomic/SimulatedCompareAndSwap.java
@@ -2,12 +2,12 @@
/**
* 描述:
- * SimulatedCAS 模拟cas操作
+ * SimulatedCompareAndSwap 模拟cas操作
*
* @author zed
* @since 2019-06-18 2:09 PM
*/
-public class SimulatedCAS{
+public class SimulatedCompareAndSwap {
private int count;
/**
diff --git a/src/main/java/com/example/concurrency/features/completablefuture/CompletableFutureExample.java b/src/main/java/com/example/concurrency/features/completablefuture/CompletableFutureExample.java
new file mode 100644
index 0000000..0bc1db6
--- /dev/null
+++ b/src/main/java/com/example/concurrency/features/completablefuture/CompletableFutureExample.java
@@ -0,0 +1,116 @@
+package com.example.concurrency.features.completablefuture;
+
+import com.example.concurrency.util.ThreadUtil;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 描述:
+ * CompletableFutureExample Java 8 异步编程可描述串行关系、AND汇聚关系和OR汇聚关系以及异常处理
+ *
+ * @author zed
+ * @since 2019-07-03 1:57 PM
+ */
+public class CompletableFutureExample {
+ /**
+ * 任务 1:洗水壶 -> 烧开水
+ */
+ private CompletableFuture f1 = CompletableFuture.runAsync(()->{
+ System.out.println("T1: 洗水壶...");
+ ThreadUtil.sleep(1, TimeUnit.SECONDS);
+ System.out.println("T1: 烧开水...");
+ ThreadUtil.sleep(15, TimeUnit.SECONDS);
+ });
+ /**
+ * 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
+ */
+ private CompletableFuture f2 = CompletableFuture.supplyAsync(()->{
+ System.out.println("T2: 洗茶壶...");
+ ThreadUtil.sleep(1, TimeUnit.SECONDS);
+
+ System.out.println("T2: 洗茶杯...");
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
+
+ System.out.println("T2: 拿茶叶...");
+ ThreadUtil.sleep(1, TimeUnit.SECONDS);
+ return " 龙井 ";
+ });
+ /**
+ * 任务 3:任务 1 和任务 2 完成后执行:泡茶
+ */
+ private CompletableFuture f3 = f1.thenCombine(f2, (e, tf)->{
+ System.out.println("T1: 拿到茶叶:" + tf);
+ System.out.println("T1: 泡茶...");
+ return " 上茶:" + tf;
+ });
+
+
+ public static void main(String[] args) {
+ CompletableFutureExample example = new CompletableFutureExample();
+ System.out.println(example.f3.join());
+ }
+
+ /**
+ * 描述串行关系 thenApply
+ */
+ static class SerialRelation{
+ private static CompletableFuture f0 =
+ CompletableFuture.supplyAsync(
+ () -> "Hello World") //①
+ .thenApply(s -> s + " QQ") //②
+ .thenApply(String::toUpperCase);//③
+
+ public static void main(String[] args) {
+ System.out.println(SerialRelation.f0.join());
+ }
+ }
+ /**
+ * 描述汇聚Or关系 thenApply
+ */
+ static class ConvergeRelation{
+ static CompletableFuture f1 =
+ CompletableFuture.supplyAsync(()->{
+ int t = getRandom(5, 10);
+ ThreadUtil.sleep(t, TimeUnit.SECONDS);
+ return String.valueOf(t);
+ });
+
+ static CompletableFuture f2 =
+ CompletableFuture.supplyAsync(()->{
+ int t = getRandom(5, 10);
+ ThreadUtil.sleep(t, TimeUnit.SECONDS);
+ return String.valueOf(t);
+ });
+
+ static CompletableFuture f3 =
+ f1.applyToEither(f2,s -> s);
+
+
+ private static int getRandom(int i,int j){
+ return (int) (Math.random() * (j - i)) +i;
+ }
+ public static void main(String[] args) {
+
+ System.out.println(ConvergeRelation.f3.join());
+ }
+ }
+
+ /**
+ * 处理异常 此例为发生异常默认为0
+ */
+ static class ExceptionHandler{
+ private static CompletableFuture
+ f0 = CompletableFuture
+ .supplyAsync(()->7/0)
+ .thenApply(r->r*10)
+ .exceptionally(e->0);
+
+ public static void main(String[] args) {
+ System.out.println(ExceptionHandler.f0.join());
+ }
+
+ }
+
+}
+
diff --git a/src/main/java/com/example/concurrency/features/completionservice/ExecutorCompletionServiceExample.java b/src/main/java/com/example/concurrency/features/completionservice/ExecutorCompletionServiceExample.java
new file mode 100644
index 0000000..a468032
--- /dev/null
+++ b/src/main/java/com/example/concurrency/features/completionservice/ExecutorCompletionServiceExample.java
@@ -0,0 +1,72 @@
+package com.example.concurrency.features.completionservice;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * 描述:
+ * CompletionService BlockingQueue+Executor
+ *
+ * 批量执行异步任务 并且可以支持Forking Cluster模式即并行查询多个服务其中一个返回则结束
+ * @author zed
+ * @since 2019-07-03 10:07 AM
+ */
+public class ExecutorCompletionServiceExample {
+
+ private Integer exe()throws InterruptedException{
+ // 创建线程池
+ ThreadPoolExecutor pool = ThreadPoolBuilder.fixedPool().setPoolSize(3).build();
+ // 创建 CompletionService
+ CompletionService completionService = new ExecutorCompletionService<>(pool);
+ // 用于保存 Future 对象
+ List> futures = new ArrayList<>(3);
+ // 提交异步任务,并保存 future 到 futures
+ futures.add(completionService.submit(this::getCoderByS1));
+ futures.add(completionService.submit(this::getCoderByS2));
+ futures.add(completionService.submit(this::getCoderByS3));
+ // 获取最快返回的任务执行结果
+ Integer r = 0;
+ try {
+ // 只要有一个成功返回,则 break 这里因为要判断是否是正确的结果 所以会循环三次,如果不考虑结果正确性,第一次拿出来的就是最先执行完的结果
+ for (int i = 0; i < 3; ++i) {
+ try {
+ r = completionService.take().get();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ // 简单地通过判空来检查是否成功返回
+ if (r != null ) {
+ break;
+ }
+ }
+ } finally {
+ // 取消所有任务
+ for(Future f : futures){
+ f.cancel(true);
+
+ }
+ }
+ pool.shutdown();
+ // 返回结果
+ return r;
+
+ }
+ private Integer getCoderByS1(){
+ return null;
+ }
+ private Integer getCoderByS2(){
+ return 2;
+ }
+ private Integer getCoderByS3(){
+ return 3;
+ }
+
+ public static void main(String[] args) throws InterruptedException{
+ System.out.println(new ExecutorCompletionServiceExample().exe());
+ }
+
+}
+
diff --git a/src/main/java/com/example/concurrency/features/concurrentHashMap/ConcurrentHashMapTest.java b/src/main/java/com/example/concurrency/features/concurrenthashmap/ConcurrentHashMapTest.java
similarity index 96%
rename from src/main/java/com/example/concurrency/features/concurrentHashMap/ConcurrentHashMapTest.java
rename to src/main/java/com/example/concurrency/features/concurrenthashmap/ConcurrentHashMapTest.java
index 322df5b..f2207df 100644
--- a/src/main/java/com/example/concurrency/features/concurrentHashMap/ConcurrentHashMapTest.java
+++ b/src/main/java/com/example/concurrency/features/concurrenthashmap/ConcurrentHashMapTest.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.concurrentHashMap;
+package com.example.concurrency.features.concurrenthashmap;
/**
* 描述:
diff --git a/src/main/java/com/example/concurrency/features/contentSwitch/ContentSwitchTest.java b/src/main/java/com/example/concurrency/features/contentswitch/ContentSwitchTest.java
similarity index 97%
rename from src/main/java/com/example/concurrency/features/contentSwitch/ContentSwitchTest.java
rename to src/main/java/com/example/concurrency/features/contentswitch/ContentSwitchTest.java
index 0640829..7ba0e71 100644
--- a/src/main/java/com/example/concurrency/features/contentSwitch/ContentSwitchTest.java
+++ b/src/main/java/com/example/concurrency/features/contentswitch/ContentSwitchTest.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.contentSwitch;
+package com.example.concurrency.features.contentswitch;
import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
diff --git a/src/main/java/com/example/concurrency/features/countDownLatchEx/CountDownLatchEx.java b/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java
similarity index 59%
rename from src/main/java/com/example/concurrency/features/countDownLatchEx/CountDownLatchEx.java
rename to src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java
index 37aee80..54c4ce8 100644
--- a/src/main/java/com/example/concurrency/features/countDownLatchEx/CountDownLatchEx.java
+++ b/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java
@@ -1,9 +1,11 @@
-package com.example.concurrency.features.countDownLatchEx;
+package com.example.concurrency.features.countdownlatch;
import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* 描述:
@@ -15,21 +17,21 @@
public class CountDownLatchEx {
public static void main(String[] args) throws InterruptedException{
// 创建 2 个线程的线程池
- Executor executor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build();
+ ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build();
// 计数器初始化为 2
CountDownLatch latch = new CountDownLatch(2);
executor.execute(()-> {
- System.out.println("T1");
+ System.out.println("T1, " + Thread.currentThread().getName());
latch.countDown();
});
executor.execute(()-> {
- System.out.println("T2");
+ System.out.println("T2, " + Thread.currentThread().getName());
latch.countDown();
});
// 等待两个查询操作结束
- latch.await();
-
-
+ latch.await(1, TimeUnit.SECONDS);
+ System.out.println("T3, " + Thread.currentThread().getName());
+ executor.shutdown();
}
}
diff --git a/src/main/java/com/example/concurrency/features/cyclicBarrierEx/CyclicBarrierEx.java b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierEx.java
similarity index 96%
rename from src/main/java/com/example/concurrency/features/cyclicBarrierEx/CyclicBarrierEx.java
rename to src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierEx.java
index e43fe7c..2ce13f4 100644
--- a/src/main/java/com/example/concurrency/features/cyclicBarrierEx/CyclicBarrierEx.java
+++ b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierEx.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.cyclicBarrierEx;
+package com.example.concurrency.features.cyclicbarrier;
import java.util.concurrent.CyclicBarrier;
diff --git a/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java
new file mode 100644
index 0000000..86fff1e
--- /dev/null
+++ b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java
@@ -0,0 +1,58 @@
+package com.example.concurrency.features.cyclicbarrier;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+import java.util.concurrent.*;
+
+public class CyclicBarrierGeekTime {
+ //订单队列
+ public static ArrayBlockingQueue pos = new ArrayBlockingQueue(10);
+ //物流队列
+ public static ArrayBlockingQueue dos = new ArrayBlockingQueue(10);
+ //这里线程设为1,防止多线程并发导致的数据不一致,因为订单和物流是两个队列
+ private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(1).setThreadNamePrefix("测试线程同步").build();
+ public static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
+ //这一步用个线程池是因为要进行异步操作
+ threadPoolExecutor.execute(CyclicBarrierGeekTime::check);
+ });
+ static void check(){
+ System.out.println("对账结果"+ (pos.poll() + "\t" + dos.poll()) +","+Thread.currentThread().getName());
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+ //循环查询订单
+ Thread t1 = new Thread(()->{
+ for(int i = 0;i<4;i++){
+ System.out.println("订单ok,"+Thread.currentThread().getName());
+ pos.offer("订单"+i);
+ try {
+ //仅仅方便观察输出
+ Thread.sleep(1000);
+ //所有阻塞式操作均用超时的方式调用
+ cyclicBarrier.await(1,TimeUnit.SECONDS);
+ } catch (Exception e) {
+ System.out.println("等超时了"+e.getMessage());
+ }
+ }
+
+ });
+ //循环查询物流库
+ Thread t2 = new Thread(()->{
+ //这里设置比订单的循环少,以让订单的cyclicBarrier超时
+ for(int j=0;j<2;j++){
+ System.out.println("物流ok,"+Thread.currentThread().getName());
+ dos.offer("物流"+j);
+ try {
+ Thread.sleep(1000);
+ cyclicBarrier.await(1,TimeUnit.SECONDS);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+
+ });
+ t1.start();
+ t2.start();
+ Thread.sleep(20000);
+ threadPoolExecutor.shutdown();
+ }
+}
diff --git a/src/main/java/com/example/concurrency/features/deadLock/DeadLockTest.java b/src/main/java/com/example/concurrency/features/deadlock/DeadLockTest.java
similarity index 97%
rename from src/main/java/com/example/concurrency/features/deadLock/DeadLockTest.java
rename to src/main/java/com/example/concurrency/features/deadlock/DeadLockTest.java
index bd0a0d2..0993d95 100644
--- a/src/main/java/com/example/concurrency/features/deadLock/DeadLockTest.java
+++ b/src/main/java/com/example/concurrency/features/deadlock/DeadLockTest.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.deadLock;
+package com.example.concurrency.features.deadlock;
import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
import com.example.concurrency.util.ThreadDumpHelper;
diff --git a/src/main/java/com/example/concurrency/features/exchanger/ExchangerExample.java b/src/main/java/com/example/concurrency/features/exchanger/ExchangerExample.java
new file mode 100644
index 0000000..fa614a1
--- /dev/null
+++ b/src/main/java/com/example/concurrency/features/exchanger/ExchangerExample.java
@@ -0,0 +1,40 @@
+package com.example.concurrency.features.exchanger;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * 描述:
+ * 交换者 用于两个线程间的数据交换
+ *
+ * @author zed
+ * @since 2019-07-03 5:39 PM
+ */
+public class ExchangerExample {
+ private static final Exchanger EXCHANGER = new Exchanger<>();
+ private static ThreadPoolExecutor poolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build();
+
+ public static void main(String[] args) {
+ poolExecutor.execute(()->{
+ try{
+ String s ="SomethingAndA";
+ EXCHANGER.exchange(s);
+ }catch (InterruptedException e){
+ e.printStackTrace();
+ }
+ });
+ poolExecutor.execute(()->{
+ try{
+ String s1 = "SomethingAndB";
+ String s = EXCHANGER.exchange("s1");
+ System.out.println("s和s1值是否相等:"+s1.equals(s)+",s:"+s+",s1:"+s1);
+ }catch (InterruptedException e){
+ e.printStackTrace();
+ }
+ });
+ poolExecutor.shutdown();
+ }
+}
+
diff --git a/src/main/java/com/example/concurrency/features/finalEx/FinalExample.java b/src/main/java/com/example/concurrency/features/finalcase/FinalExample.java
similarity index 94%
rename from src/main/java/com/example/concurrency/features/finalEx/FinalExample.java
rename to src/main/java/com/example/concurrency/features/finalcase/FinalExample.java
index 97f2380..97615bb 100644
--- a/src/main/java/com/example/concurrency/features/finalEx/FinalExample.java
+++ b/src/main/java/com/example/concurrency/features/finalcase/FinalExample.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.finalEx;
+package com.example.concurrency.features.finalcase;
/**
* 描述: 构造函数的错误重排导致线程可能看到 final 变量的值会变
diff --git a/src/main/java/com/example/concurrency/features/futureTask/FutureTaskEx.java b/src/main/java/com/example/concurrency/features/futuretask/FutureTaskEx.java
similarity index 98%
rename from src/main/java/com/example/concurrency/features/futureTask/FutureTaskEx.java
rename to src/main/java/com/example/concurrency/features/futuretask/FutureTaskEx.java
index ec8519d..506aa7b 100644
--- a/src/main/java/com/example/concurrency/features/futureTask/FutureTaskEx.java
+++ b/src/main/java/com/example/concurrency/features/futuretask/FutureTaskEx.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.futureTask;
+package com.example.concurrency.features.futuretask;
import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
import com.example.concurrency.features.threadPool.ThreadPoolUtil;
diff --git a/src/main/java/com/example/concurrency/features/readWriteLock/CacheByReadWriteLock.java b/src/main/java/com/example/concurrency/features/readwritelock/CacheByReadWriteLock.java
similarity index 97%
rename from src/main/java/com/example/concurrency/features/readWriteLock/CacheByReadWriteLock.java
rename to src/main/java/com/example/concurrency/features/readwritelock/CacheByReadWriteLock.java
index 3c398b1..a0b7b15 100644
--- a/src/main/java/com/example/concurrency/features/readWriteLock/CacheByReadWriteLock.java
+++ b/src/main/java/com/example/concurrency/features/readwritelock/CacheByReadWriteLock.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.readWriteLock;
+package com.example.concurrency.features.readwritelock;
import java.util.HashMap;
import java.util.Map;
diff --git a/src/main/java/com/example/concurrency/features/readWriteLock/StampedLockEx.java b/src/main/java/com/example/concurrency/features/readwritelock/StampedLockEx.java
similarity index 97%
rename from src/main/java/com/example/concurrency/features/readWriteLock/StampedLockEx.java
rename to src/main/java/com/example/concurrency/features/readwritelock/StampedLockEx.java
index 07b17e8..35517a4 100644
--- a/src/main/java/com/example/concurrency/features/readWriteLock/StampedLockEx.java
+++ b/src/main/java/com/example/concurrency/features/readwritelock/StampedLockEx.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.readWriteLock;
+package com.example.concurrency.features.readwritelock;
import java.util.concurrent.locks.StampedLock;
diff --git a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedConnection.java b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedConnection.java
similarity index 99%
rename from src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedConnection.java
rename to src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedConnection.java
index 6d5c7ac..8bb96ad 100644
--- a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedConnection.java
+++ b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedConnection.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.synchronizedEx;
+package com.example.concurrency.features.synchronizedcase;
import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
import com.example.concurrency.util.ThreadDumpHelper;
diff --git a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedExample.java b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedExample.java
similarity index 97%
rename from src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedExample.java
rename to src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedExample.java
index 1d3c697..28a48b6 100644
--- a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedExample.java
+++ b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedExample.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.synchronizedEx;
+package com.example.concurrency.features.synchronizedcase;
import java.util.ArrayList;
import java.util.List;
diff --git a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedNoConnection.java b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedNoConnection.java
similarity index 96%
rename from src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedNoConnection.java
rename to src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedNoConnection.java
index bb7ecb1..1e51c16 100644
--- a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedNoConnection.java
+++ b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedNoConnection.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.synchronizedEx;
+package com.example.concurrency.features.synchronizedcase;
/**
* 描述:
diff --git a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedResolveDeadLock.java b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedResolveDeadLock.java
similarity index 98%
rename from src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedResolveDeadLock.java
rename to src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedResolveDeadLock.java
index 45fc07c..965b401 100644
--- a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedResolveDeadLock.java
+++ b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedResolveDeadLock.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.synchronizedEx;
+package com.example.concurrency.features.synchronizedcase;
import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
diff --git a/src/main/java/com/example/concurrency/features/threadState/ThreadState.java b/src/main/java/com/example/concurrency/features/threadstate/ThreadState.java
similarity index 96%
rename from src/main/java/com/example/concurrency/features/threadState/ThreadState.java
rename to src/main/java/com/example/concurrency/features/threadstate/ThreadState.java
index efb2166..39fff00 100644
--- a/src/main/java/com/example/concurrency/features/threadState/ThreadState.java
+++ b/src/main/java/com/example/concurrency/features/threadstate/ThreadState.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.threadState;
+package com.example.concurrency.features.threadstate;
import com.example.concurrency.util.ThreadDumpHelper;
import com.example.concurrency.util.ThreadUtil;
diff --git a/src/main/java/com/example/concurrency/features/visibility/Visibility.java b/src/main/java/com/example/concurrency/features/visibility/Visibility.java
index 9e9fd0d..50142fb 100644
--- a/src/main/java/com/example/concurrency/features/visibility/Visibility.java
+++ b/src/main/java/com/example/concurrency/features/visibility/Visibility.java
@@ -14,7 +14,7 @@
*/
public class Visibility {
private static long count = 0;
- private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().build();
+ private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build();
private void add10k() {
int idx = 0;
diff --git a/src/main/java/com/example/concurrency/features/volatileExample/VolatileExample.java b/src/main/java/com/example/concurrency/features/volatilecase/VolatileExample.java
similarity index 95%
rename from src/main/java/com/example/concurrency/features/volatileExample/VolatileExample.java
rename to src/main/java/com/example/concurrency/features/volatilecase/VolatileExample.java
index 7463ddd..ba66955 100644
--- a/src/main/java/com/example/concurrency/features/volatileExample/VolatileExample.java
+++ b/src/main/java/com/example/concurrency/features/volatilecase/VolatileExample.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.volatileExample;
+package com.example.concurrency.features.volatilecase;
import java.util.Scanner;
@@ -40,7 +40,7 @@ public static void main(String[] args) {
while(sc.hasNext()){
String value = sc.next();
if("1".equals(value)){
- new Thread(() -> aVolatile.stopThread()).start();
+ new Thread(aVolatile::stopThread).start();
break ;
}
}
diff --git a/src/main/java/com/example/concurrency/features/blockingQueue/BlockedQueue.java b/src/main/java/com/example/concurrency/patterns/blockingQueue/BlockedQueue.java
similarity index 96%
rename from src/main/java/com/example/concurrency/features/blockingQueue/BlockedQueue.java
rename to src/main/java/com/example/concurrency/patterns/blockingQueue/BlockedQueue.java
index 8878b9a..cd62c48 100644
--- a/src/main/java/com/example/concurrency/features/blockingQueue/BlockedQueue.java
+++ b/src/main/java/com/example/concurrency/patterns/blockingQueue/BlockedQueue.java
@@ -1,4 +1,4 @@
-package com.example.concurrency.features.blockingQueue;
+package com.example.concurrency.patterns.blockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
diff --git a/src/main/java/com/example/concurrency/patterns/conditionqueues/ConditionQueue.java b/src/main/java/com/example/concurrency/patterns/conditionqueues/ConditionQueue.java
new file mode 100644
index 0000000..40fc024
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/conditionqueues/ConditionQueue.java
@@ -0,0 +1,89 @@
+package com.example.concurrency.patterns.conditionqueues;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+
+import java.util.UUID;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * 描述:
+ * 条件队列
+ * 通过ReentrantLock 实现打印和停止打印的条件队列,每次容量为5
+ *
+ * @author zed
+ * @since 2019-07-01 10:59 AM
+ */
+public class ConditionQueue {
+ private static final int LIMIT = 5;
+ private int messageCount = 0;
+ private Lock lock = new ReentrantLock();
+ private Condition limitReachedCondition = lock.newCondition();
+ private Condition limitUnreachedCondition = lock.newCondition();
+ private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(2).setThreadNamePrefix("Condition Queue").build();
+
+ /**
+ * 停止打印
+ * @throws InterruptedException e
+ */
+ private void stopMessages() throws InterruptedException {
+ lock.lock();
+ try {
+ while (messageCount < LIMIT) {
+ limitReachedCondition.await();
+ }
+ System.err.println("Limit reached. Wait 2s");
+ Thread.sleep(2000);
+ messageCount = 0;
+ limitUnreachedCondition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * 打印
+ * @param message mes
+ * @throws InterruptedException e
+ */
+ private void printMessages(String message) throws InterruptedException {
+ lock.lock();
+ try {
+ while (messageCount == LIMIT) {
+ limitUnreachedCondition.await();
+ }
+ System.out.println(message);
+ messageCount++;
+ limitReachedCondition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public static void main(String[] args) {
+ ConditionQueue queue = new ConditionQueue();
+ // Will run indefinitely
+ threadPoolExecutor.execute(() -> {
+ while (true) {
+ String uuidMessage = UUID.randomUUID().toString();
+ try {
+ queue.printMessages(uuidMessage);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ threadPoolExecutor.execute(() -> {
+ while (true) {
+ try {
+ queue.stopMessages();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+}
+
diff --git a/src/main/java/com/example/concurrency/patterns/conditionqueues/WaitNotifyQueue.java b/src/main/java/com/example/concurrency/patterns/conditionqueues/WaitNotifyQueue.java
new file mode 100644
index 0000000..5090002
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/conditionqueues/WaitNotifyQueue.java
@@ -0,0 +1,89 @@
+package com.example.concurrency.patterns.conditionqueues;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * 描述:
+ * 基于Object等待通知队列
+ * wait() and notify
+ *
+ * @author zed
+ * @since 2019-07-01 11:12 AM
+ */
+public class WaitNotifyQueue {
+ private boolean continueToNotify;
+ private BlockingQueue messages;
+ private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(2).setThreadNamePrefix("WaitNotifyQueue").build();
+
+
+ private WaitNotifyQueue(List messages) {
+ this.messages = new LinkedBlockingQueue<>(messages);
+ this.continueToNotify = true;
+ }
+
+ private synchronized void stopsMessaging() {
+ continueToNotify = false;
+ notifyAll();
+ }
+
+ private synchronized void message() throws InterruptedException {
+ while (!continueToNotify){
+ wait();
+ }
+ String message = messages.take();
+ System.out.println(message);
+ }
+ @SuppressWarnings("unchecked")
+ public static void main(String[] args) {
+ List messages = new LinkedList();
+ for (int i = 0; i < 130; i++) {
+ messages.add(UUID.randomUUID().toString());
+ }
+ WaitNotifyQueue waitNotifyQueue = new WaitNotifyQueue(messages);
+ threadPoolExecutor.execute(() -> {
+ try {
+ while (true) {
+ waitNotifyQueue.message();
+ Thread.sleep(300);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ });
+
+ Random random = new Random();
+ threadPoolExecutor.execute(() -> {
+ while (true) {
+ int val = random.nextInt(100);
+ System.out.println(val);
+ if (val == 99) {
+ break;
+ }
+ try {
+ Thread.sleep(400);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ }
+ waitNotifyQueue.stopsMessaging();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ });
+
+ }
+}
+
diff --git a/src/main/java/com/example/concurrency/patterns/divideconquer/ParallelDivideAndConquer.java b/src/main/java/com/example/concurrency/patterns/divideconquer/ParallelDivideAndConquer.java
new file mode 100644
index 0000000..fae3566
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/divideconquer/ParallelDivideAndConquer.java
@@ -0,0 +1,117 @@
+package com.example.concurrency.patterns.divideconquer;
+
+import org.springframework.util.StopWatch;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+/**
+ * 描述:
+ * 平行分治 fork/join
+ *
+ * @author zed
+ * @since 2019-07-01 11:59 AM
+ */
+public class ParallelDivideAndConquer {
+ /**
+ * 分治执行
+ */
+ public static class ParallelSum extends RecursiveTask {
+
+ private static final long serialVersionUID = 1L;
+ /**
+ * 选择一个数来分割计算
+ */
+ private final static int THRESHOLD = 10_000;
+ private List bigIntegerList;
+
+ public ParallelSum(List bigIntegerList) {
+ this.bigIntegerList = bigIntegerList;
+
+ }
+ @Override
+ protected BigInteger compute() {
+ int size = bigIntegerList.size();
+ if (size < THRESHOLD) {
+ return sequentialSum(bigIntegerList);
+ } else {
+ ParallelSum x = new ParallelSum(bigIntegerList.subList(0, size / 2));
+ ParallelSum y = new ParallelSum(bigIntegerList.subList(size / 2, size));
+ x.fork();
+ y.fork();
+ BigInteger xResult = x.join();
+ BigInteger yResult = y.join();
+ return yResult.add(xResult);
+ }
+ }
+ }
+ /**
+ * 顺序执行
+ * @param list list
+ * @return sum
+ */
+ private static BigInteger sequentialSum(List list) {
+ BigInteger acc = BigInteger.ZERO;
+ for (BigInteger value : list) {
+ acc = acc.add(value);
+ }
+ return acc;
+ }
+
+ /**
+ * 通过两种方式验证一千万个数据的累加执行速度
+ * @param args args
+ * @throws InterruptedException e
+ */
+ public static void main(String[] args) throws InterruptedException{
+ List list = LongStream.range(0, 10_000_000L)
+ .mapToObj(BigInteger::valueOf)
+ .collect(Collectors.toList());
+ /*
+ * Fork/Join 分值累加
+ */
+ Runnable parallel = () -> {
+ ForkJoinPool commonPool = ForkJoinPool.commonPool();
+ BigInteger result = commonPool.invoke(new ParallelSum(list));
+
+ System.out.println("Parallel Result is: " + result);
+ };
+ /*
+ * 串型累加
+ */
+ Runnable sequential = () -> {
+ BigInteger acc = sequentialSum(list);
+
+ System.out.println("Sequential Result is: " + acc);
+ };
+
+ System.out.println("first time 耗时 \n\n");
+ dummyBenchmark(sequential);
+ dummyBenchmark(parallel);
+
+ Thread.sleep(2000);
+ System.out.println("some JIT 之后\n\n");
+ dummyBenchmark(sequential);
+ dummyBenchmark(parallel);
+
+ Thread.sleep(2000);
+ System.out.println("more JIT之后 \n\n");
+ dummyBenchmark(sequential);
+ dummyBenchmark(parallel);
+
+
+ }
+ private static void dummyBenchmark(Runnable runnable) {
+ StopWatch stopWatch = new StopWatch("耗时情况");
+ stopWatch.start();
+ runnable.run();
+ stopWatch.stop();
+ System.out.println("Executed in: " + stopWatch.prettyPrint());
+ System.out.println("######\n");
+ }
+}
+
diff --git a/src/main/java/com/example/concurrency/patterns/lockordering/FixedLockOrdering.java b/src/main/java/com/example/concurrency/patterns/lockordering/FixedLockOrdering.java
new file mode 100755
index 0000000..ef69e7b
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/lockordering/FixedLockOrdering.java
@@ -0,0 +1,61 @@
+package com.example.concurrency.patterns.lockordering;
+
+
+/**
+ * 描述:
+ * 处理锁顺序问题 (防止死锁)@link com.example.concurrency.features.synchronizedcase.SynchronizedResolveDeadLock.SortExeAccount
+ * 适用场景:处理多个锁时
+ * @author zed
+ * @since 2019-06-30 10:59 AM
+ */
+public class FixedLockOrdering {
+
+ private static class LockableObject {
+ /**
+ * id
+ */
+ private int id;
+ /**
+ * another
+ */
+ private String anotherValue;
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public String getAnotherValue() {
+ return anotherValue;
+ }
+
+ public void setAnotherValue(String anotherValue) {
+ this.anotherValue = anotherValue;
+ }
+ }
+
+ /**
+ * 多个对象锁操作 例如银行转账
+ * @param obj1 o1
+ * @param obj2 o2
+ */
+ public void doSomeOperation(LockableObject obj1, LockableObject obj2) {
+ LockableObject left = obj1;
+ LockableObject right = obj2;
+ if (obj1.getId() > obj2.getId()) {
+ left = obj2;
+ right = obj1;
+ }
+ // 锁定序号小的账户
+ synchronized(left){
+ // 锁定序号大的账户
+ synchronized(right){
+ //do something
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/com/example/concurrency/patterns/producerconsumer/ProducerConsumer.java b/src/main/java/com/example/concurrency/patterns/producerconsumer/ProducerConsumer.java
new file mode 100644
index 0000000..8a194bb
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/producerconsumer/ProducerConsumer.java
@@ -0,0 +1,56 @@
+package com.example.concurrency.patterns.producerconsumer;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+
+import java.util.UUID;
+import java.util.concurrent.*;
+
+/**
+ * 描述:
+ * 生产消费
+ * @author zed
+ * @since 2019-07-01 2:12 PM
+ */
+public class ProducerConsumer {
+ /**
+ * 阻塞队列存储数据
+ */
+ private BlockingQueue data = new LinkedBlockingQueue<>();
+ /**
+ * 消费者线程
+ */
+ private Callable consumer = () -> {
+ while (true) {
+ String dataUnit = data.poll(5, TimeUnit.SECONDS);
+ if (dataUnit == null){
+ break;
+ }
+ System.out.println("Consumed " + dataUnit + " from " + Thread.currentThread().getName());
+ }
+ return null;
+ };
+ /**
+ * 生产者线程
+ */
+ private Callable producer = () -> {
+ for (int i = 0; i < 90; i++) {
+ String dataUnit = UUID.randomUUID().toString();
+ data.put(dataUnit);
+ }
+ return null;
+ };
+
+ public static void main(String[] args) throws InterruptedException{
+ ProducerConsumer producerConsumer = new ProducerConsumer();
+ ThreadPoolExecutor pool = ThreadPoolBuilder.cachedPool().setThreadNamePrefix("生产消费线程").build();
+ pool.submit(producerConsumer.producer);
+ pool.submit(producerConsumer.consumer);
+ pool.submit(producerConsumer.consumer);
+ //非阻塞,不允许继续提交
+ pool.shutdown();
+ //阻塞,允许提交,返回终止结果 true/false
+ pool.awaitTermination(5, TimeUnit.SECONDS);
+
+ }
+}
+
diff --git a/src/main/java/com/example/concurrency/patterns/resourcepool/ResourcePool.java b/src/main/java/com/example/concurrency/patterns/resourcepool/ResourcePool.java
new file mode 100644
index 0000000..d6d0d47
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/resourcepool/ResourcePool.java
@@ -0,0 +1,85 @@
+package com.example.concurrency.patterns.resourcepool;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.*;
+
+/**
+ * 描述:
+ * 对象资源池
+ * 适用场景:当要创建某些有限资源的池时使用
+ * @author zed
+ * @since 2019-07-01 3:01 PM
+ */
+public class ResourcePool {
+ private final static TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+ private Semaphore semaphore;
+ private BlockingQueue resources;
+
+ public ResourcePool(int poolSize, List initializedResources) {
+ //fail true 即 FIFO
+ this.semaphore = new Semaphore(poolSize, true);
+ this.resources = new LinkedBlockingQueue<>(poolSize);
+ this.resources.addAll(initializedResources);
+ }
+
+ /**
+ * 获取资源 阻塞等待
+ * @return resource
+ * @throws InterruptedException e
+ */
+ public T get() throws InterruptedException {
+ return get(Integer.MAX_VALUE);
+ }
+
+ /**
+ * 获取资源
+ * @param secondsToTimeout 超时时间
+ * @return source
+ * @throws InterruptedException e
+ */
+ public T get(long secondsToTimeout) throws InterruptedException {
+ semaphore.acquire();
+ try {
+ return resources.poll(secondsToTimeout, TIME_UNIT);
+ } finally {
+ semaphore.release();
+ }
+ }
+
+ /**
+ * 释放资源
+ * @param resource re
+ * @throws InterruptedException e
+ */
+ public void release(T resource) throws InterruptedException {
+ if (resource != null) {
+ resources.put(resource);
+ semaphore.release();
+ }
+ }
+
+ public static void main(String[] args) {
+ ThreadPoolExecutor executor = ThreadPoolBuilder.cachedPool().setThreadNamePrefix("资源池线程").build();
+ ResourcePool pool = new ResourcePool<>(15, Arrays.asList(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14));
+ Random random = new Random();
+ for (int i = 0; i < 30; i++) {
+ executor.execute(() -> {
+ try {
+ Integer value = pool.get(60);
+ System.out.println("Value taken: " + value);
+ Thread.sleep(random.nextInt(5000));
+ pool.release(value);
+ System.out.println("Value released " + value);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+ executor.shutdown();
+ }
+}
+
diff --git a/src/main/java/com/example/concurrency/patterns/resourcesinitialization/ControlledInitialization.java b/src/main/java/com/example/concurrency/patterns/resourcesinitialization/ControlledInitialization.java
new file mode 100644
index 0000000..7260e4c
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/resourcesinitialization/ControlledInitialization.java
@@ -0,0 +1,115 @@
+package com.example.concurrency.patterns.resourcesinitialization;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * 描述:
+ * 基于CountDownLatch实现应用初始化
+ * 适用场景:多个重要资源的初始化
+ *
+ * @author zed
+ * @since 2019-07-01 11:17 AM
+ */
+public class ControlledInitialization {
+ private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool()
+ .setPoolSize(3)
+ .setThreadNamePrefix("ControlledInitialization").build();
+ /**
+ * 资源1
+ */
+ private static class Resource1 {
+ }
+ /**
+ * 资源2
+ */
+ private static class Resource2 {
+ }
+ /**
+ * 资源3
+ */
+ private static class Resource3 {
+ }
+ private Resource1 resource1;
+ private Resource2 resource2;
+ private Resource3 resource3;
+ /**
+ * 声明一个count为3 的CountDownLatch
+ */
+ private CountDownLatch latch = new CountDownLatch(3);
+ private Runnable initResource1 = () -> {
+ try {
+ // simulate wait
+ Thread.sleep(4000);
+ resource1 = new Resource1();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }finally {
+ latch.countDown();
+ }
+ };
+
+ private Runnable initResource2 = () -> {
+ try {
+ // simulate wait
+ Thread.sleep(4000);
+ resource2 = new Resource2();
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }finally {
+ latch.countDown();
+ }
+ };
+
+ private Runnable initResource3 = () -> {
+ try {
+ // simulate wait
+ Thread.sleep(4000);
+ resource3 = new Resource3();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }finally {
+ latch.countDown();
+ }
+ };
+
+ /**
+ * 构造函数中实现完成加载过程
+ */
+ private ControlledInitialization() {
+ //初始化操作
+ initialize();
+ //等待资源加载完毕
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ /*
+ * 基于资源加载完毕开始后续操作
+ */
+ doTask();
+ }
+ private void doTask() {
+ System.out.println("=== Resources Initialized ===");
+ System.out.println("Resource 1 instance " + resource1);
+ System.out.println("Resource 2 instance " + resource2);
+ System.out.println("Resource 3 instance " + resource3);
+
+ }
+ private void initialize() {
+ System.out.println("=== Initializing Resources ===");
+ threadPoolExecutor.execute(initResource1);
+ threadPoolExecutor.execute(initResource2);
+ threadPoolExecutor.execute(initResource3);
+ threadPoolExecutor.shutdown();
+ }
+ public static void main(String[] args) {
+ new ControlledInitialization();
+ }
+
+}
+
diff --git a/src/main/java/com/example/concurrency/patterns/serialtask/SerialTask.java b/src/main/java/com/example/concurrency/patterns/serialtask/SerialTask.java
new file mode 100644
index 0000000..86c51af
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/serialtask/SerialTask.java
@@ -0,0 +1,57 @@
+package com.example.concurrency.patterns.serialtask;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+import com.example.concurrency.util.ThreadUtil;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 描述:
+ * 通过锁和volatile 实现的线程共享方案
+ * 适用场景:特定条件下有效阻止并行执行
+ *
+ * @author zed
+ * @since 2019-07-02 3:33 PM
+ */
+public class SerialTask {
+ /**
+ * 导出flag true:有任务在执行 false:可以执行当前任务
+ */
+ private static volatile boolean permits =false;
+
+ /**
+ * 占用任务
+ */
+ public synchronized static boolean setPermits(){
+ if(!permits){
+ permits = true;
+ return true;
+ }
+ return false;
+ }
+ /**
+ * 释放任务
+ */
+ public synchronized static void releasePermits() {
+ permits = false;
+ }
+ public static void main(String[] args) throws InterruptedException{
+ ThreadPoolExecutor pool = ThreadPoolBuilder.cachedPool().setThreadNamePrefix("导出excel").build();
+ for(int i =0;i< 10;i++) {
+ pool.execute(() -> {
+ while (SerialTask.permits || !SerialTask.setPermits()) {
+ //return or retry
+ }
+ SerialTask.setPermits();
+ System.out.println(Thread.currentThread().getName()+"正在执行");
+ ThreadUtil.sleep(1000);
+ System.out.println(Thread.currentThread().getName()+"执行结束");
+ SerialTask.releasePermits();
+ });
+ }
+ pool.awaitTermination(5, TimeUnit.SECONDS);
+
+ }
+}
+
diff --git a/src/main/java/com/example/concurrency/patterns/taskcancel/ConsoleTimePrintTask.java b/src/main/java/com/example/concurrency/patterns/taskcancel/ConsoleTimePrintTask.java
new file mode 100644
index 0000000..470eae6
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/taskcancel/ConsoleTimePrintTask.java
@@ -0,0 +1,58 @@
+package com.example.concurrency.patterns.taskcancel;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * 描述:
+ * 控制台输出时间任务取消 通过interrupt() 注意抛出InterruptedException异常的地方需要重置下打断标识
+ * 适用场景:当后台任务需要取消时
+ * @author zed
+ * @since 2019-06-30 3:18 PM
+ */
+public class ConsoleTimePrintTask {
+ private Thread thread;
+ /**
+ * 执行线程
+ */
+ private Runnable task = () -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ Date date = new Date(System.currentTimeMillis());
+ System.out.println(new SimpleDateFormat().format(date));
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // 注意这里如果没有抛出InterruptedException 则不需要执行interrupt
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ /**
+ * run
+ */
+ public void run() {
+ thread = new Thread(task);
+ thread.start();
+ }
+
+ /**
+ * cancel
+ */
+ public void cancel() {
+ if (thread != null) {
+ thread.interrupt();
+ }
+ }
+
+ public static void main(String[] args) {
+ ConsoleTimePrintTask self = new ConsoleTimePrintTask();
+ self.run();
+ try {
+ Thread.sleep(4000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ self.cancel();
+ }
+}
+
diff --git a/src/main/java/com/example/concurrency/patterns/taskconvergence/TaskConvergence.java b/src/main/java/com/example/concurrency/patterns/taskconvergence/TaskConvergence.java
new file mode 100644
index 0000000..b1b93dd
--- /dev/null
+++ b/src/main/java/com/example/concurrency/patterns/taskconvergence/TaskConvergence.java
@@ -0,0 +1,82 @@
+package com.example.concurrency.patterns.taskconvergence;
+
+import com.example.concurrency.features.threadPool.ThreadPoolBuilder;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.*;
+
+/**
+ * 描述:
+ * 聚合任务 CyclicBarrier(int parties, Runnable barrierAction) barrierAction为到达屏障后优先执行的任务
+ * 适用场景:当需要确定一组正在运行的任务是否已完成时
+ * @author zed
+ * @since 2019-06-30 3:33 PM
+ */
+public class TaskConvergence {
+
+ private static final int BOUND = 150_000;
+ private static final int SIZE = 400_000;
+ private static final int CORES = Runtime.getRuntime().availableProcessors();
+ private CyclicBarrier barrier;
+ private List synchronizedLinkedList;
+ private ExecutorService executor;
+
+ @SuppressWarnings("unchecked")
+ private Runnable run = () -> {
+ Random random = new Random();
+ List results = new LinkedList();
+ for (int i = 0; i < SIZE; i++) {
+ Long next = (long) random.nextInt(BOUND);
+ results.add(next);
+ }
+ try {
+ synchronizedLinkedList.addAll(results);
+ barrier.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ };
+
+ /**
+ * 构造函数
+ */
+ public TaskConvergence() {
+ /*
+ * 到达屏障执行任务
+ */
+ Runnable onComplete = () -> {
+ System.out.println("=== Random Number Results ===");
+ System.out.println("CPU Cores: " + CORES);
+ System.out.println("Random Bound: " + BOUND);
+ System.out.println("Iterations per Core: " + SIZE);
+ System.out.println("Total Iterations: " + SIZE * CORES);
+ System.out.println("Size: " + synchronizedLinkedList.size());
+ System.out.println("Sum " + synchronizedLinkedList.stream().mapToLong(Long::longValue).sum());
+ };
+ barrier = new CyclicBarrier(CORES, onComplete);
+ synchronizedLinkedList = Collections.synchronizedList(new LinkedList<>());
+ executor = ThreadPoolBuilder.fixedPool().setPoolSize(CORES).build();
+ }
+
+ /**
+ * run
+ */
+ public void run() {
+ for (int i = 0; i < CORES; i++) {
+ executor.execute(run);
+ }
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ executor.shutdown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ public static void main(String[] args) {
+ new TaskConvergence().run();
+ }
+}
+