diff --git a/README.md b/README.md index 1ce6f93..29f4957 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ -内容整理自[《Java并发编程实战》](https://time.geekbang.org/column/intro/159) +内容整理自[《Java并发编程实战》](https://time.geekbang.org/column/intro/159)[《java-concurrency-patterns》](https://github.com/LeonardoZ/java-concurrency-patterns) 相关工具类[《vjtools》](https://github.com/vipshop/vjtools) -WeChat:Zed-RD [![Build Status](https://travis-ci.org/Fadezed/concurrency.svg?branch=master)](https://travis-ci.org/Fadezed/concurrency) + + [![Build Status](https://travis-ci.org/Fadezed/concurrency.svg?branch=master)](https://travis-ci.org/Fadezed/concurrency)
@@ -39,20 +40,18 @@ WeChat:Zed-RD [![Build Status](https://travis-ci.org/Fadezed/concurrency.svg?br ## 缓存导致的可见性问题 - * 一个线程对共享变量的修改,另外一个线程能够立刻看到,我们称为**可见性** -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/visibility/Visibility.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/visibility/Visibility.java) ![单核CPU](media/15603316282468/a07e8182819e2b260ce85b2167d446da.png) ![多核CPU](media/15603316282468/e2aa76928b2bc135e08e7590ca36e0ea.png) ## 线程切换带来的原子性问题 * 一个或者多个操作在 CPU 执行的过程中不被中断的特性称为**原子性** - * **时间片**概念 * 线程切换 ---〉提升cpu利用率。 tips:Unix系统因支持多进程分时复用而出名。 -* 线程切换[代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/contentSwitch/ContentSwitchTest.java)。 -* 原子问题[代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/atomic/AtomicCounter.java) +* 线程切换[代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/contentswitch/ContentSwitchTest.java)。 +* 原子问题[代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/atomic/AtomicCounter.java) ![254b129b145d80e9bb74123d6e620efb](media/15603316282468/254b129b145d80e9bb74123d6e620efb.png) * count+=1 操作分析 @@ -94,7 +93,7 @@ public class Singleton { ## 按需禁用缓存以及编译优化 [代码来源](http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html) * ## volatile - * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/volatileExample/VolatileExample.java) + * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/volatilecase/VolatileExample.java) * ## synchronized @@ -134,9 +133,9 @@ public class Singleton { * 偏向锁 * 轻量级锁 * 重量级锁 - - * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/synchronizedEx/SynchronizedExample.java) + + * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedExample.java) ``` class X { @@ -158,7 +157,7 @@ public class Singleton { ## final -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/finalEx/FinalExample.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/finalcase/FinalExample.java) * 修饰变量时,初衷是告诉编译器:这个变量生而不变,非immutable,即只能表示对象引用不能被赋值(例如List); * 修饰方法则方法不能被重写 * 修饰类则不能被扩展继承。 @@ -238,7 +237,7 @@ public class Singleton { ------- # 4.JAVA线程的生命周期 -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/threadState/ThreadState.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/threadstate/ThreadState.java) ## 通用的线程生命周期 * 初始状态 @@ -484,7 +483,7 @@ class Semaphore{ ### 使用方法 #### 实现互斥 #### 实现限流器(Semaphore 可以允许多个线程访问一个临界区) -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/semaphore/SemaphoreEx.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/semaphore/SemaphoreEx.java) ------- @@ -495,16 +494,16 @@ class Semaphore{ * 只允许一个线程写共享变量 * 如果一个写线程正在执行写操作,此时禁止读线程读共享变量 -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/readWriteLock/CacheByReadWriteLock.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/readwritelock/CacheByReadWriteLock.java) ## StampedLock 加上乐观读(无锁) -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/readWriteLock/StampedLockEx.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/readwritelock/StampedLockEx.java) ## CountDownLatch -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/countDownLatchEx/CountDownLatchEx.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java) ## CyclicBarrier -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/cyclicBarrierEx/CyclicBarrierEx.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierEx.java) @@ -581,7 +580,7 @@ CopyOnWriteArraySet、ConcurrentSkipListSet # 11. 原子类 * 无锁方案实现原理(Compare And Swap) - * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/atomic/SimulatedCAS.java) + * [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/atomic/SimulatedCompareAndSwap.java) * 概览图 ![atomi](media/15608379390765/atomic.png) @@ -640,7 +639,7 @@ accumulateAndGet(x,func) * 而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了。 ## 线程池是一种生产者-消费者模式(非一般意义池化资源) -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/threadPool/MyThreadPool.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/threadPool/MyThreadPool.java) * Java ThreadPoolExecutor ``` @@ -713,7 +712,7 @@ get(long timeout, TimeUnit unit); ## FutureTask工具类(实现了RunnableFuture而它继承了Runnable和Future接口) -* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/futureTask/FutureTaskEx.java) +* [代码示例](https://github.com/Fadezed/concurrency/blob/master/src/main/java/com/example/concurrency/features/futuretask/FutureTaskEx.java) * 构造函数类似线程池submit ``` diff --git a/src/main/java/com/example/concurrency/features/atomic/SimulatedCAS.java b/src/main/java/com/example/concurrency/features/atomic/SimulatedCompareAndSwap.java similarity index 91% rename from src/main/java/com/example/concurrency/features/atomic/SimulatedCAS.java rename to src/main/java/com/example/concurrency/features/atomic/SimulatedCompareAndSwap.java index 41afaae..a3345c6 100644 --- a/src/main/java/com/example/concurrency/features/atomic/SimulatedCAS.java +++ b/src/main/java/com/example/concurrency/features/atomic/SimulatedCompareAndSwap.java @@ -2,12 +2,12 @@ /** * 描述: - * SimulatedCAS 模拟cas操作 + * SimulatedCompareAndSwap 模拟cas操作 * * @author zed * @since 2019-06-18 2:09 PM */ -public class SimulatedCAS{ +public class SimulatedCompareAndSwap { private int count; /** diff --git a/src/main/java/com/example/concurrency/features/completablefuture/CompletableFutureExample.java b/src/main/java/com/example/concurrency/features/completablefuture/CompletableFutureExample.java new file mode 100644 index 0000000..0bc1db6 --- /dev/null +++ b/src/main/java/com/example/concurrency/features/completablefuture/CompletableFutureExample.java @@ -0,0 +1,116 @@ +package com.example.concurrency.features.completablefuture; + +import com.example.concurrency.util.ThreadUtil; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * 描述: + * CompletableFutureExample Java 8 异步编程可描述串行关系、AND汇聚关系和OR汇聚关系以及异常处理 + * + * @author zed + * @since 2019-07-03 1:57 PM + */ +public class CompletableFutureExample { + /** + * 任务 1:洗水壶 -> 烧开水 + */ + private CompletableFuture f1 = CompletableFuture.runAsync(()->{ + System.out.println("T1: 洗水壶..."); + ThreadUtil.sleep(1, TimeUnit.SECONDS); + System.out.println("T1: 烧开水..."); + ThreadUtil.sleep(15, TimeUnit.SECONDS); + }); + /** + * 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶 + */ + private CompletableFuture f2 = CompletableFuture.supplyAsync(()->{ + System.out.println("T2: 洗茶壶..."); + ThreadUtil.sleep(1, TimeUnit.SECONDS); + + System.out.println("T2: 洗茶杯..."); + ThreadUtil.sleep(2, TimeUnit.SECONDS); + + System.out.println("T2: 拿茶叶..."); + ThreadUtil.sleep(1, TimeUnit.SECONDS); + return " 龙井 "; + }); + /** + * 任务 3:任务 1 和任务 2 完成后执行:泡茶 + */ + private CompletableFuture f3 = f1.thenCombine(f2, (e, tf)->{ + System.out.println("T1: 拿到茶叶:" + tf); + System.out.println("T1: 泡茶..."); + return " 上茶:" + tf; + }); + + + public static void main(String[] args) { + CompletableFutureExample example = new CompletableFutureExample(); + System.out.println(example.f3.join()); + } + + /** + * 描述串行关系 thenApply + */ + static class SerialRelation{ + private static CompletableFuture f0 = + CompletableFuture.supplyAsync( + () -> "Hello World") //① + .thenApply(s -> s + " QQ") //② + .thenApply(String::toUpperCase);//③ + + public static void main(String[] args) { + System.out.println(SerialRelation.f0.join()); + } + } + /** + * 描述汇聚Or关系 thenApply + */ + static class ConvergeRelation{ + static CompletableFuture f1 = + CompletableFuture.supplyAsync(()->{ + int t = getRandom(5, 10); + ThreadUtil.sleep(t, TimeUnit.SECONDS); + return String.valueOf(t); + }); + + static CompletableFuture f2 = + CompletableFuture.supplyAsync(()->{ + int t = getRandom(5, 10); + ThreadUtil.sleep(t, TimeUnit.SECONDS); + return String.valueOf(t); + }); + + static CompletableFuture f3 = + f1.applyToEither(f2,s -> s); + + + private static int getRandom(int i,int j){ + return (int) (Math.random() * (j - i)) +i; + } + public static void main(String[] args) { + + System.out.println(ConvergeRelation.f3.join()); + } + } + + /** + * 处理异常 此例为发生异常默认为0 + */ + static class ExceptionHandler{ + private static CompletableFuture + f0 = CompletableFuture + .supplyAsync(()->7/0) + .thenApply(r->r*10) + .exceptionally(e->0); + + public static void main(String[] args) { + System.out.println(ExceptionHandler.f0.join()); + } + + } + +} + diff --git a/src/main/java/com/example/concurrency/features/completionservice/ExecutorCompletionServiceExample.java b/src/main/java/com/example/concurrency/features/completionservice/ExecutorCompletionServiceExample.java new file mode 100644 index 0000000..a468032 --- /dev/null +++ b/src/main/java/com/example/concurrency/features/completionservice/ExecutorCompletionServiceExample.java @@ -0,0 +1,72 @@ +package com.example.concurrency.features.completionservice; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +/** + * 描述: + * CompletionService BlockingQueue+Executor + * + * 批量执行异步任务 并且可以支持Forking Cluster模式即并行查询多个服务其中一个返回则结束 + * @author zed + * @since 2019-07-03 10:07 AM + */ +public class ExecutorCompletionServiceExample { + + private Integer exe()throws InterruptedException{ + // 创建线程池 + ThreadPoolExecutor pool = ThreadPoolBuilder.fixedPool().setPoolSize(3).build(); + // 创建 CompletionService + CompletionService completionService = new ExecutorCompletionService<>(pool); + // 用于保存 Future 对象 + List> futures = new ArrayList<>(3); + // 提交异步任务,并保存 future 到 futures + futures.add(completionService.submit(this::getCoderByS1)); + futures.add(completionService.submit(this::getCoderByS2)); + futures.add(completionService.submit(this::getCoderByS3)); + // 获取最快返回的任务执行结果 + Integer r = 0; + try { + // 只要有一个成功返回,则 break 这里因为要判断是否是正确的结果 所以会循环三次,如果不考虑结果正确性,第一次拿出来的就是最先执行完的结果 + for (int i = 0; i < 3; ++i) { + try { + r = completionService.take().get(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + // 简单地通过判空来检查是否成功返回 + if (r != null ) { + break; + } + } + } finally { + // 取消所有任务 + for(Future f : futures){ + f.cancel(true); + + } + } + pool.shutdown(); + // 返回结果 + return r; + + } + private Integer getCoderByS1(){ + return null; + } + private Integer getCoderByS2(){ + return 2; + } + private Integer getCoderByS3(){ + return 3; + } + + public static void main(String[] args) throws InterruptedException{ + System.out.println(new ExecutorCompletionServiceExample().exe()); + } + +} + diff --git a/src/main/java/com/example/concurrency/features/concurrentHashMap/ConcurrentHashMapTest.java b/src/main/java/com/example/concurrency/features/concurrenthashmap/ConcurrentHashMapTest.java similarity index 96% rename from src/main/java/com/example/concurrency/features/concurrentHashMap/ConcurrentHashMapTest.java rename to src/main/java/com/example/concurrency/features/concurrenthashmap/ConcurrentHashMapTest.java index 322df5b..f2207df 100644 --- a/src/main/java/com/example/concurrency/features/concurrentHashMap/ConcurrentHashMapTest.java +++ b/src/main/java/com/example/concurrency/features/concurrenthashmap/ConcurrentHashMapTest.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.concurrentHashMap; +package com.example.concurrency.features.concurrenthashmap; /** * 描述: diff --git a/src/main/java/com/example/concurrency/features/contentSwitch/ContentSwitchTest.java b/src/main/java/com/example/concurrency/features/contentswitch/ContentSwitchTest.java similarity index 97% rename from src/main/java/com/example/concurrency/features/contentSwitch/ContentSwitchTest.java rename to src/main/java/com/example/concurrency/features/contentswitch/ContentSwitchTest.java index 0640829..7ba0e71 100644 --- a/src/main/java/com/example/concurrency/features/contentSwitch/ContentSwitchTest.java +++ b/src/main/java/com/example/concurrency/features/contentswitch/ContentSwitchTest.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.contentSwitch; +package com.example.concurrency.features.contentswitch; import com.example.concurrency.features.threadPool.ThreadPoolBuilder; diff --git a/src/main/java/com/example/concurrency/features/countDownLatchEx/CountDownLatchEx.java b/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java similarity index 59% rename from src/main/java/com/example/concurrency/features/countDownLatchEx/CountDownLatchEx.java rename to src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java index 37aee80..54c4ce8 100644 --- a/src/main/java/com/example/concurrency/features/countDownLatchEx/CountDownLatchEx.java +++ b/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java @@ -1,9 +1,11 @@ -package com.example.concurrency.features.countDownLatchEx; +package com.example.concurrency.features.countdownlatch; import com.example.concurrency.features.threadPool.ThreadPoolBuilder; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * 描述: @@ -15,21 +17,21 @@ public class CountDownLatchEx { public static void main(String[] args) throws InterruptedException{ // 创建 2 个线程的线程池 - Executor executor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build(); + ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build(); // 计数器初始化为 2 CountDownLatch latch = new CountDownLatch(2); executor.execute(()-> { - System.out.println("T1"); + System.out.println("T1, " + Thread.currentThread().getName()); latch.countDown(); }); executor.execute(()-> { - System.out.println("T2"); + System.out.println("T2, " + Thread.currentThread().getName()); latch.countDown(); }); // 等待两个查询操作结束 - latch.await(); - - + latch.await(1, TimeUnit.SECONDS); + System.out.println("T3, " + Thread.currentThread().getName()); + executor.shutdown(); } } diff --git a/src/main/java/com/example/concurrency/features/cyclicBarrierEx/CyclicBarrierEx.java b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierEx.java similarity index 96% rename from src/main/java/com/example/concurrency/features/cyclicBarrierEx/CyclicBarrierEx.java rename to src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierEx.java index e43fe7c..2ce13f4 100644 --- a/src/main/java/com/example/concurrency/features/cyclicBarrierEx/CyclicBarrierEx.java +++ b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierEx.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.cyclicBarrierEx; +package com.example.concurrency.features.cyclicbarrier; import java.util.concurrent.CyclicBarrier; diff --git a/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java new file mode 100644 index 0000000..86fff1e --- /dev/null +++ b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java @@ -0,0 +1,58 @@ +package com.example.concurrency.features.cyclicbarrier; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; +import java.util.concurrent.*; + +public class CyclicBarrierGeekTime { + //订单队列 + public static ArrayBlockingQueue pos = new ArrayBlockingQueue(10); + //物流队列 + public static ArrayBlockingQueue dos = new ArrayBlockingQueue(10); + //这里线程设为1,防止多线程并发导致的数据不一致,因为订单和物流是两个队列 + private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(1).setThreadNamePrefix("测试线程同步").build(); + public static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{ + //这一步用个线程池是因为要进行异步操作 + threadPoolExecutor.execute(CyclicBarrierGeekTime::check); + }); + static void check(){ + System.out.println("对账结果"+ (pos.poll() + "\t" + dos.poll()) +","+Thread.currentThread().getName()); + } + + public static void main(String[] args) throws InterruptedException { + //循环查询订单 + Thread t1 = new Thread(()->{ + for(int i = 0;i<4;i++){ + System.out.println("订单ok,"+Thread.currentThread().getName()); + pos.offer("订单"+i); + try { + //仅仅方便观察输出 + Thread.sleep(1000); + //所有阻塞式操作均用超时的方式调用 + cyclicBarrier.await(1,TimeUnit.SECONDS); + } catch (Exception e) { + System.out.println("等超时了"+e.getMessage()); + } + } + + }); + //循环查询物流库 + Thread t2 = new Thread(()->{ + //这里设置比订单的循环少,以让订单的cyclicBarrier超时 + for(int j=0;j<2;j++){ + System.out.println("物流ok,"+Thread.currentThread().getName()); + dos.offer("物流"+j); + try { + Thread.sleep(1000); + cyclicBarrier.await(1,TimeUnit.SECONDS); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + }); + t1.start(); + t2.start(); + Thread.sleep(20000); + threadPoolExecutor.shutdown(); + } +} diff --git a/src/main/java/com/example/concurrency/features/deadLock/DeadLockTest.java b/src/main/java/com/example/concurrency/features/deadlock/DeadLockTest.java similarity index 97% rename from src/main/java/com/example/concurrency/features/deadLock/DeadLockTest.java rename to src/main/java/com/example/concurrency/features/deadlock/DeadLockTest.java index bd0a0d2..0993d95 100644 --- a/src/main/java/com/example/concurrency/features/deadLock/DeadLockTest.java +++ b/src/main/java/com/example/concurrency/features/deadlock/DeadLockTest.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.deadLock; +package com.example.concurrency.features.deadlock; import com.example.concurrency.features.threadPool.ThreadPoolBuilder; import com.example.concurrency.util.ThreadDumpHelper; diff --git a/src/main/java/com/example/concurrency/features/exchanger/ExchangerExample.java b/src/main/java/com/example/concurrency/features/exchanger/ExchangerExample.java new file mode 100644 index 0000000..fa614a1 --- /dev/null +++ b/src/main/java/com/example/concurrency/features/exchanger/ExchangerExample.java @@ -0,0 +1,40 @@ +package com.example.concurrency.features.exchanger; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; + +import java.util.concurrent.Exchanger; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 描述: + * 交换者 用于两个线程间的数据交换 + * + * @author zed + * @since 2019-07-03 5:39 PM + */ +public class ExchangerExample { + private static final Exchanger EXCHANGER = new Exchanger<>(); + private static ThreadPoolExecutor poolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build(); + + public static void main(String[] args) { + poolExecutor.execute(()->{ + try{ + String s ="SomethingAndA"; + EXCHANGER.exchange(s); + }catch (InterruptedException e){ + e.printStackTrace(); + } + }); + poolExecutor.execute(()->{ + try{ + String s1 = "SomethingAndB"; + String s = EXCHANGER.exchange("s1"); + System.out.println("s和s1值是否相等:"+s1.equals(s)+",s:"+s+",s1:"+s1); + }catch (InterruptedException e){ + e.printStackTrace(); + } + }); + poolExecutor.shutdown(); + } +} + diff --git a/src/main/java/com/example/concurrency/features/finalEx/FinalExample.java b/src/main/java/com/example/concurrency/features/finalcase/FinalExample.java similarity index 94% rename from src/main/java/com/example/concurrency/features/finalEx/FinalExample.java rename to src/main/java/com/example/concurrency/features/finalcase/FinalExample.java index 97f2380..97615bb 100644 --- a/src/main/java/com/example/concurrency/features/finalEx/FinalExample.java +++ b/src/main/java/com/example/concurrency/features/finalcase/FinalExample.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.finalEx; +package com.example.concurrency.features.finalcase; /** * 描述: 构造函数的错误重排导致线程可能看到 final 变量的值会变 diff --git a/src/main/java/com/example/concurrency/features/futureTask/FutureTaskEx.java b/src/main/java/com/example/concurrency/features/futuretask/FutureTaskEx.java similarity index 98% rename from src/main/java/com/example/concurrency/features/futureTask/FutureTaskEx.java rename to src/main/java/com/example/concurrency/features/futuretask/FutureTaskEx.java index ec8519d..506aa7b 100644 --- a/src/main/java/com/example/concurrency/features/futureTask/FutureTaskEx.java +++ b/src/main/java/com/example/concurrency/features/futuretask/FutureTaskEx.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.futureTask; +package com.example.concurrency.features.futuretask; import com.example.concurrency.features.threadPool.ThreadPoolBuilder; import com.example.concurrency.features.threadPool.ThreadPoolUtil; diff --git a/src/main/java/com/example/concurrency/features/readWriteLock/CacheByReadWriteLock.java b/src/main/java/com/example/concurrency/features/readwritelock/CacheByReadWriteLock.java similarity index 97% rename from src/main/java/com/example/concurrency/features/readWriteLock/CacheByReadWriteLock.java rename to src/main/java/com/example/concurrency/features/readwritelock/CacheByReadWriteLock.java index 3c398b1..a0b7b15 100644 --- a/src/main/java/com/example/concurrency/features/readWriteLock/CacheByReadWriteLock.java +++ b/src/main/java/com/example/concurrency/features/readwritelock/CacheByReadWriteLock.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.readWriteLock; +package com.example.concurrency.features.readwritelock; import java.util.HashMap; import java.util.Map; diff --git a/src/main/java/com/example/concurrency/features/readWriteLock/StampedLockEx.java b/src/main/java/com/example/concurrency/features/readwritelock/StampedLockEx.java similarity index 97% rename from src/main/java/com/example/concurrency/features/readWriteLock/StampedLockEx.java rename to src/main/java/com/example/concurrency/features/readwritelock/StampedLockEx.java index 07b17e8..35517a4 100644 --- a/src/main/java/com/example/concurrency/features/readWriteLock/StampedLockEx.java +++ b/src/main/java/com/example/concurrency/features/readwritelock/StampedLockEx.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.readWriteLock; +package com.example.concurrency.features.readwritelock; import java.util.concurrent.locks.StampedLock; diff --git a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedConnection.java b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedConnection.java similarity index 99% rename from src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedConnection.java rename to src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedConnection.java index 6d5c7ac..8bb96ad 100644 --- a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedConnection.java +++ b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedConnection.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.synchronizedEx; +package com.example.concurrency.features.synchronizedcase; import com.example.concurrency.features.threadPool.ThreadPoolBuilder; import com.example.concurrency.util.ThreadDumpHelper; diff --git a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedExample.java b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedExample.java similarity index 97% rename from src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedExample.java rename to src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedExample.java index 1d3c697..28a48b6 100644 --- a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedExample.java +++ b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedExample.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.synchronizedEx; +package com.example.concurrency.features.synchronizedcase; import java.util.ArrayList; import java.util.List; diff --git a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedNoConnection.java b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedNoConnection.java similarity index 96% rename from src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedNoConnection.java rename to src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedNoConnection.java index bb7ecb1..1e51c16 100644 --- a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedNoConnection.java +++ b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedNoConnection.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.synchronizedEx; +package com.example.concurrency.features.synchronizedcase; /** * 描述: diff --git a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedResolveDeadLock.java b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedResolveDeadLock.java similarity index 98% rename from src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedResolveDeadLock.java rename to src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedResolveDeadLock.java index 45fc07c..965b401 100644 --- a/src/main/java/com/example/concurrency/features/synchronizedEx/SynchronizedResolveDeadLock.java +++ b/src/main/java/com/example/concurrency/features/synchronizedcase/SynchronizedResolveDeadLock.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.synchronizedEx; +package com.example.concurrency.features.synchronizedcase; import com.example.concurrency.features.threadPool.ThreadPoolBuilder; diff --git a/src/main/java/com/example/concurrency/features/threadState/ThreadState.java b/src/main/java/com/example/concurrency/features/threadstate/ThreadState.java similarity index 96% rename from src/main/java/com/example/concurrency/features/threadState/ThreadState.java rename to src/main/java/com/example/concurrency/features/threadstate/ThreadState.java index efb2166..39fff00 100644 --- a/src/main/java/com/example/concurrency/features/threadState/ThreadState.java +++ b/src/main/java/com/example/concurrency/features/threadstate/ThreadState.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.threadState; +package com.example.concurrency.features.threadstate; import com.example.concurrency.util.ThreadDumpHelper; import com.example.concurrency.util.ThreadUtil; diff --git a/src/main/java/com/example/concurrency/features/visibility/Visibility.java b/src/main/java/com/example/concurrency/features/visibility/Visibility.java index 9e9fd0d..50142fb 100644 --- a/src/main/java/com/example/concurrency/features/visibility/Visibility.java +++ b/src/main/java/com/example/concurrency/features/visibility/Visibility.java @@ -14,7 +14,7 @@ */ public class Visibility { private static long count = 0; - private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().build(); + private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build(); private void add10k() { int idx = 0; diff --git a/src/main/java/com/example/concurrency/features/volatileExample/VolatileExample.java b/src/main/java/com/example/concurrency/features/volatilecase/VolatileExample.java similarity index 95% rename from src/main/java/com/example/concurrency/features/volatileExample/VolatileExample.java rename to src/main/java/com/example/concurrency/features/volatilecase/VolatileExample.java index 7463ddd..ba66955 100644 --- a/src/main/java/com/example/concurrency/features/volatileExample/VolatileExample.java +++ b/src/main/java/com/example/concurrency/features/volatilecase/VolatileExample.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.volatileExample; +package com.example.concurrency.features.volatilecase; import java.util.Scanner; @@ -40,7 +40,7 @@ public static void main(String[] args) { while(sc.hasNext()){ String value = sc.next(); if("1".equals(value)){ - new Thread(() -> aVolatile.stopThread()).start(); + new Thread(aVolatile::stopThread).start(); break ; } } diff --git a/src/main/java/com/example/concurrency/features/blockingQueue/BlockedQueue.java b/src/main/java/com/example/concurrency/patterns/blockingQueue/BlockedQueue.java similarity index 96% rename from src/main/java/com/example/concurrency/features/blockingQueue/BlockedQueue.java rename to src/main/java/com/example/concurrency/patterns/blockingQueue/BlockedQueue.java index 8878b9a..cd62c48 100644 --- a/src/main/java/com/example/concurrency/features/blockingQueue/BlockedQueue.java +++ b/src/main/java/com/example/concurrency/patterns/blockingQueue/BlockedQueue.java @@ -1,4 +1,4 @@ -package com.example.concurrency.features.blockingQueue; +package com.example.concurrency.patterns.blockingQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; diff --git a/src/main/java/com/example/concurrency/patterns/conditionqueues/ConditionQueue.java b/src/main/java/com/example/concurrency/patterns/conditionqueues/ConditionQueue.java new file mode 100644 index 0000000..40fc024 --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/conditionqueues/ConditionQueue.java @@ -0,0 +1,89 @@ +package com.example.concurrency.patterns.conditionqueues; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; + +import java.util.UUID; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * 描述: + * 条件队列 + * 通过ReentrantLock 实现打印和停止打印的条件队列,每次容量为5 + * + * @author zed + * @since 2019-07-01 10:59 AM + */ +public class ConditionQueue { + private static final int LIMIT = 5; + private int messageCount = 0; + private Lock lock = new ReentrantLock(); + private Condition limitReachedCondition = lock.newCondition(); + private Condition limitUnreachedCondition = lock.newCondition(); + private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(2).setThreadNamePrefix("Condition Queue").build(); + + /** + * 停止打印 + * @throws InterruptedException e + */ + private void stopMessages() throws InterruptedException { + lock.lock(); + try { + while (messageCount < LIMIT) { + limitReachedCondition.await(); + } + System.err.println("Limit reached. Wait 2s"); + Thread.sleep(2000); + messageCount = 0; + limitUnreachedCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + /** + * 打印 + * @param message mes + * @throws InterruptedException e + */ + private void printMessages(String message) throws InterruptedException { + lock.lock(); + try { + while (messageCount == LIMIT) { + limitUnreachedCondition.await(); + } + System.out.println(message); + messageCount++; + limitReachedCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + public static void main(String[] args) { + ConditionQueue queue = new ConditionQueue(); + // Will run indefinitely + threadPoolExecutor.execute(() -> { + while (true) { + String uuidMessage = UUID.randomUUID().toString(); + try { + queue.printMessages(uuidMessage); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + threadPoolExecutor.execute(() -> { + while (true) { + try { + queue.stopMessages(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + } +} + diff --git a/src/main/java/com/example/concurrency/patterns/conditionqueues/WaitNotifyQueue.java b/src/main/java/com/example/concurrency/patterns/conditionqueues/WaitNotifyQueue.java new file mode 100644 index 0000000..5090002 --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/conditionqueues/WaitNotifyQueue.java @@ -0,0 +1,89 @@ +package com.example.concurrency.patterns.conditionqueues; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 描述: + * 基于Object等待通知队列 + * wait() and notify + * + * @author zed + * @since 2019-07-01 11:12 AM + */ +public class WaitNotifyQueue { + private boolean continueToNotify; + private BlockingQueue messages; + private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(2).setThreadNamePrefix("WaitNotifyQueue").build(); + + + private WaitNotifyQueue(List messages) { + this.messages = new LinkedBlockingQueue<>(messages); + this.continueToNotify = true; + } + + private synchronized void stopsMessaging() { + continueToNotify = false; + notifyAll(); + } + + private synchronized void message() throws InterruptedException { + while (!continueToNotify){ + wait(); + } + String message = messages.take(); + System.out.println(message); + } + @SuppressWarnings("unchecked") + public static void main(String[] args) { + List messages = new LinkedList(); + for (int i = 0; i < 130; i++) { + messages.add(UUID.randomUUID().toString()); + } + WaitNotifyQueue waitNotifyQueue = new WaitNotifyQueue(messages); + threadPoolExecutor.execute(() -> { + try { + while (true) { + waitNotifyQueue.message(); + Thread.sleep(300); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + }); + + Random random = new Random(); + threadPoolExecutor.execute(() -> { + while (true) { + int val = random.nextInt(100); + System.out.println(val); + if (val == 99) { + break; + } + try { + Thread.sleep(400); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + waitNotifyQueue.stopsMessaging(); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + }); + + } +} + diff --git a/src/main/java/com/example/concurrency/patterns/divideconquer/ParallelDivideAndConquer.java b/src/main/java/com/example/concurrency/patterns/divideconquer/ParallelDivideAndConquer.java new file mode 100644 index 0000000..fae3566 --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/divideconquer/ParallelDivideAndConquer.java @@ -0,0 +1,117 @@ +package com.example.concurrency.patterns.divideconquer; + +import org.springframework.util.StopWatch; + +import java.math.BigInteger; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveTask; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +/** + * 描述: + * 平行分治 fork/join + * + * @author zed + * @since 2019-07-01 11:59 AM + */ +public class ParallelDivideAndConquer { + /** + * 分治执行 + */ + public static class ParallelSum extends RecursiveTask { + + private static final long serialVersionUID = 1L; + /** + * 选择一个数来分割计算 + */ + private final static int THRESHOLD = 10_000; + private List bigIntegerList; + + public ParallelSum(List bigIntegerList) { + this.bigIntegerList = bigIntegerList; + + } + @Override + protected BigInteger compute() { + int size = bigIntegerList.size(); + if (size < THRESHOLD) { + return sequentialSum(bigIntegerList); + } else { + ParallelSum x = new ParallelSum(bigIntegerList.subList(0, size / 2)); + ParallelSum y = new ParallelSum(bigIntegerList.subList(size / 2, size)); + x.fork(); + y.fork(); + BigInteger xResult = x.join(); + BigInteger yResult = y.join(); + return yResult.add(xResult); + } + } + } + /** + * 顺序执行 + * @param list list + * @return sum + */ + private static BigInteger sequentialSum(List list) { + BigInteger acc = BigInteger.ZERO; + for (BigInteger value : list) { + acc = acc.add(value); + } + return acc; + } + + /** + * 通过两种方式验证一千万个数据的累加执行速度 + * @param args args + * @throws InterruptedException e + */ + public static void main(String[] args) throws InterruptedException{ + List list = LongStream.range(0, 10_000_000L) + .mapToObj(BigInteger::valueOf) + .collect(Collectors.toList()); + /* + * Fork/Join 分值累加 + */ + Runnable parallel = () -> { + ForkJoinPool commonPool = ForkJoinPool.commonPool(); + BigInteger result = commonPool.invoke(new ParallelSum(list)); + + System.out.println("Parallel Result is: " + result); + }; + /* + * 串型累加 + */ + Runnable sequential = () -> { + BigInteger acc = sequentialSum(list); + + System.out.println("Sequential Result is: " + acc); + }; + + System.out.println("first time 耗时 \n\n"); + dummyBenchmark(sequential); + dummyBenchmark(parallel); + + Thread.sleep(2000); + System.out.println("some JIT 之后\n\n"); + dummyBenchmark(sequential); + dummyBenchmark(parallel); + + Thread.sleep(2000); + System.out.println("more JIT之后 \n\n"); + dummyBenchmark(sequential); + dummyBenchmark(parallel); + + + } + private static void dummyBenchmark(Runnable runnable) { + StopWatch stopWatch = new StopWatch("耗时情况"); + stopWatch.start(); + runnable.run(); + stopWatch.stop(); + System.out.println("Executed in: " + stopWatch.prettyPrint()); + System.out.println("######\n"); + } +} + diff --git a/src/main/java/com/example/concurrency/patterns/lockordering/FixedLockOrdering.java b/src/main/java/com/example/concurrency/patterns/lockordering/FixedLockOrdering.java new file mode 100755 index 0000000..ef69e7b --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/lockordering/FixedLockOrdering.java @@ -0,0 +1,61 @@ +package com.example.concurrency.patterns.lockordering; + + +/** + * 描述: + * 处理锁顺序问题 (防止死锁)@link com.example.concurrency.features.synchronizedcase.SynchronizedResolveDeadLock.SortExeAccount + * 适用场景:处理多个锁时 + * @author zed + * @since 2019-06-30 10:59 AM + */ +public class FixedLockOrdering { + + private static class LockableObject { + /** + * id + */ + private int id; + /** + * another + */ + private String anotherValue; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getAnotherValue() { + return anotherValue; + } + + public void setAnotherValue(String anotherValue) { + this.anotherValue = anotherValue; + } + } + + /** + * 多个对象锁操作 例如银行转账 + * @param obj1 o1 + * @param obj2 o2 + */ + public void doSomeOperation(LockableObject obj1, LockableObject obj2) { + LockableObject left = obj1; + LockableObject right = obj2; + if (obj1.getId() > obj2.getId()) { + left = obj2; + right = obj1; + } + // 锁定序号小的账户 + synchronized(left){ + // 锁定序号大的账户 + synchronized(right){ + //do something + } + } + } + +} diff --git a/src/main/java/com/example/concurrency/patterns/producerconsumer/ProducerConsumer.java b/src/main/java/com/example/concurrency/patterns/producerconsumer/ProducerConsumer.java new file mode 100644 index 0000000..8a194bb --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/producerconsumer/ProducerConsumer.java @@ -0,0 +1,56 @@ +package com.example.concurrency.patterns.producerconsumer; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; + +import java.util.UUID; +import java.util.concurrent.*; + +/** + * 描述: + * 生产消费 + * @author zed + * @since 2019-07-01 2:12 PM + */ +public class ProducerConsumer { + /** + * 阻塞队列存储数据 + */ + private BlockingQueue data = new LinkedBlockingQueue<>(); + /** + * 消费者线程 + */ + private Callable consumer = () -> { + while (true) { + String dataUnit = data.poll(5, TimeUnit.SECONDS); + if (dataUnit == null){ + break; + } + System.out.println("Consumed " + dataUnit + " from " + Thread.currentThread().getName()); + } + return null; + }; + /** + * 生产者线程 + */ + private Callable producer = () -> { + for (int i = 0; i < 90; i++) { + String dataUnit = UUID.randomUUID().toString(); + data.put(dataUnit); + } + return null; + }; + + public static void main(String[] args) throws InterruptedException{ + ProducerConsumer producerConsumer = new ProducerConsumer(); + ThreadPoolExecutor pool = ThreadPoolBuilder.cachedPool().setThreadNamePrefix("生产消费线程").build(); + pool.submit(producerConsumer.producer); + pool.submit(producerConsumer.consumer); + pool.submit(producerConsumer.consumer); + //非阻塞,不允许继续提交 + pool.shutdown(); + //阻塞,允许提交,返回终止结果 true/false + pool.awaitTermination(5, TimeUnit.SECONDS); + + } +} + diff --git a/src/main/java/com/example/concurrency/patterns/resourcepool/ResourcePool.java b/src/main/java/com/example/concurrency/patterns/resourcepool/ResourcePool.java new file mode 100644 index 0000000..d6d0d47 --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/resourcepool/ResourcePool.java @@ -0,0 +1,85 @@ +package com.example.concurrency.patterns.resourcepool; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.*; + +/** + * 描述: + * 对象资源池 + * 适用场景:当要创建某些有限资源的池时使用 + * @author zed + * @since 2019-07-01 3:01 PM + */ +public class ResourcePool { + private final static TimeUnit TIME_UNIT = TimeUnit.SECONDS; + private Semaphore semaphore; + private BlockingQueue resources; + + public ResourcePool(int poolSize, List initializedResources) { + //fail true 即 FIFO + this.semaphore = new Semaphore(poolSize, true); + this.resources = new LinkedBlockingQueue<>(poolSize); + this.resources.addAll(initializedResources); + } + + /** + * 获取资源 阻塞等待 + * @return resource + * @throws InterruptedException e + */ + public T get() throws InterruptedException { + return get(Integer.MAX_VALUE); + } + + /** + * 获取资源 + * @param secondsToTimeout 超时时间 + * @return source + * @throws InterruptedException e + */ + public T get(long secondsToTimeout) throws InterruptedException { + semaphore.acquire(); + try { + return resources.poll(secondsToTimeout, TIME_UNIT); + } finally { + semaphore.release(); + } + } + + /** + * 释放资源 + * @param resource re + * @throws InterruptedException e + */ + public void release(T resource) throws InterruptedException { + if (resource != null) { + resources.put(resource); + semaphore.release(); + } + } + + public static void main(String[] args) { + ThreadPoolExecutor executor = ThreadPoolBuilder.cachedPool().setThreadNamePrefix("资源池线程").build(); + ResourcePool pool = new ResourcePool<>(15, Arrays.asList(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14)); + Random random = new Random(); + for (int i = 0; i < 30; i++) { + executor.execute(() -> { + try { + Integer value = pool.get(60); + System.out.println("Value taken: " + value); + Thread.sleep(random.nextInt(5000)); + pool.release(value); + System.out.println("Value released " + value); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + } + executor.shutdown(); + } +} + diff --git a/src/main/java/com/example/concurrency/patterns/resourcesinitialization/ControlledInitialization.java b/src/main/java/com/example/concurrency/patterns/resourcesinitialization/ControlledInitialization.java new file mode 100644 index 0000000..7260e4c --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/resourcesinitialization/ControlledInitialization.java @@ -0,0 +1,115 @@ +package com.example.concurrency.patterns.resourcesinitialization; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 描述: + * 基于CountDownLatch实现应用初始化 + * 适用场景:多个重要资源的初始化 + * + * @author zed + * @since 2019-07-01 11:17 AM + */ +public class ControlledInitialization { + private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool() + .setPoolSize(3) + .setThreadNamePrefix("ControlledInitialization").build(); + /** + * 资源1 + */ + private static class Resource1 { + } + /** + * 资源2 + */ + private static class Resource2 { + } + /** + * 资源3 + */ + private static class Resource3 { + } + private Resource1 resource1; + private Resource2 resource2; + private Resource3 resource3; + /** + * 声明一个count为3 的CountDownLatch + */ + private CountDownLatch latch = new CountDownLatch(3); + private Runnable initResource1 = () -> { + try { + // simulate wait + Thread.sleep(4000); + resource1 = new Resource1(); + } catch (InterruptedException e) { + e.printStackTrace(); + }finally { + latch.countDown(); + } + }; + + private Runnable initResource2 = () -> { + try { + // simulate wait + Thread.sleep(4000); + resource2 = new Resource2(); + + } catch (InterruptedException e) { + e.printStackTrace(); + }finally { + latch.countDown(); + } + }; + + private Runnable initResource3 = () -> { + try { + // simulate wait + Thread.sleep(4000); + resource3 = new Resource3(); + } catch (InterruptedException e) { + e.printStackTrace(); + }finally { + latch.countDown(); + } + }; + + /** + * 构造函数中实现完成加载过程 + */ + private ControlledInitialization() { + //初始化操作 + initialize(); + //等待资源加载完毕 + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + /* + * 基于资源加载完毕开始后续操作 + */ + doTask(); + } + private void doTask() { + System.out.println("=== Resources Initialized ==="); + System.out.println("Resource 1 instance " + resource1); + System.out.println("Resource 2 instance " + resource2); + System.out.println("Resource 3 instance " + resource3); + + } + private void initialize() { + System.out.println("=== Initializing Resources ==="); + threadPoolExecutor.execute(initResource1); + threadPoolExecutor.execute(initResource2); + threadPoolExecutor.execute(initResource3); + threadPoolExecutor.shutdown(); + } + public static void main(String[] args) { + new ControlledInitialization(); + } + +} + diff --git a/src/main/java/com/example/concurrency/patterns/serialtask/SerialTask.java b/src/main/java/com/example/concurrency/patterns/serialtask/SerialTask.java new file mode 100644 index 0000000..86c51af --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/serialtask/SerialTask.java @@ -0,0 +1,57 @@ +package com.example.concurrency.patterns.serialtask; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; +import com.example.concurrency.util.ThreadUtil; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 描述: + * 通过锁和volatile 实现的线程共享方案 + * 适用场景:特定条件下有效阻止并行执行 + * + * @author zed + * @since 2019-07-02 3:33 PM + */ +public class SerialTask { + /** + * 导出flag true:有任务在执行 false:可以执行当前任务 + */ + private static volatile boolean permits =false; + + /** + * 占用任务 + */ + public synchronized static boolean setPermits(){ + if(!permits){ + permits = true; + return true; + } + return false; + } + /** + * 释放任务 + */ + public synchronized static void releasePermits() { + permits = false; + } + public static void main(String[] args) throws InterruptedException{ + ThreadPoolExecutor pool = ThreadPoolBuilder.cachedPool().setThreadNamePrefix("导出excel").build(); + for(int i =0;i< 10;i++) { + pool.execute(() -> { + while (SerialTask.permits || !SerialTask.setPermits()) { + //return or retry + } + SerialTask.setPermits(); + System.out.println(Thread.currentThread().getName()+"正在执行"); + ThreadUtil.sleep(1000); + System.out.println(Thread.currentThread().getName()+"执行结束"); + SerialTask.releasePermits(); + }); + } + pool.awaitTermination(5, TimeUnit.SECONDS); + + } +} + diff --git a/src/main/java/com/example/concurrency/patterns/taskcancel/ConsoleTimePrintTask.java b/src/main/java/com/example/concurrency/patterns/taskcancel/ConsoleTimePrintTask.java new file mode 100644 index 0000000..470eae6 --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/taskcancel/ConsoleTimePrintTask.java @@ -0,0 +1,58 @@ +package com.example.concurrency.patterns.taskcancel; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * 描述: + * 控制台输出时间任务取消 通过interrupt() 注意抛出InterruptedException异常的地方需要重置下打断标识 + * 适用场景:当后台任务需要取消时 + * @author zed + * @since 2019-06-30 3:18 PM + */ +public class ConsoleTimePrintTask { + private Thread thread; + /** + * 执行线程 + */ + private Runnable task = () -> { + while (!Thread.currentThread().isInterrupted()) { + Date date = new Date(System.currentTimeMillis()); + System.out.println(new SimpleDateFormat().format(date)); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // 注意这里如果没有抛出InterruptedException 则不需要执行interrupt + Thread.currentThread().interrupt(); + } + } + }; + /** + * run + */ + public void run() { + thread = new Thread(task); + thread.start(); + } + + /** + * cancel + */ + public void cancel() { + if (thread != null) { + thread.interrupt(); + } + } + + public static void main(String[] args) { + ConsoleTimePrintTask self = new ConsoleTimePrintTask(); + self.run(); + try { + Thread.sleep(4000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + self.cancel(); + } +} + diff --git a/src/main/java/com/example/concurrency/patterns/taskconvergence/TaskConvergence.java b/src/main/java/com/example/concurrency/patterns/taskconvergence/TaskConvergence.java new file mode 100644 index 0000000..b1b93dd --- /dev/null +++ b/src/main/java/com/example/concurrency/patterns/taskconvergence/TaskConvergence.java @@ -0,0 +1,82 @@ +package com.example.concurrency.patterns.taskconvergence; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.*; + +/** + * 描述: + * 聚合任务 CyclicBarrier(int parties, Runnable barrierAction) barrierAction为到达屏障后优先执行的任务 + * 适用场景:当需要确定一组正在运行的任务是否已完成时 + * @author zed + * @since 2019-06-30 3:33 PM + */ +public class TaskConvergence { + + private static final int BOUND = 150_000; + private static final int SIZE = 400_000; + private static final int CORES = Runtime.getRuntime().availableProcessors(); + private CyclicBarrier barrier; + private List synchronizedLinkedList; + private ExecutorService executor; + + @SuppressWarnings("unchecked") + private Runnable run = () -> { + Random random = new Random(); + List results = new LinkedList(); + for (int i = 0; i < SIZE; i++) { + Long next = (long) random.nextInt(BOUND); + results.add(next); + } + try { + synchronizedLinkedList.addAll(results); + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); + } + }; + + /** + * 构造函数 + */ + public TaskConvergence() { + /* + * 到达屏障执行任务 + */ + Runnable onComplete = () -> { + System.out.println("=== Random Number Results ==="); + System.out.println("CPU Cores: " + CORES); + System.out.println("Random Bound: " + BOUND); + System.out.println("Iterations per Core: " + SIZE); + System.out.println("Total Iterations: " + SIZE * CORES); + System.out.println("Size: " + synchronizedLinkedList.size()); + System.out.println("Sum " + synchronizedLinkedList.stream().mapToLong(Long::longValue).sum()); + }; + barrier = new CyclicBarrier(CORES, onComplete); + synchronizedLinkedList = Collections.synchronizedList(new LinkedList<>()); + executor = ThreadPoolBuilder.fixedPool().setPoolSize(CORES).build(); + } + + /** + * run + */ + public void run() { + for (int i = 0; i < CORES; i++) { + executor.execute(run); + } + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + executor.shutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + public static void main(String[] args) { + new TaskConvergence().run(); + } +} +