diff --git a/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java b/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java index 8ff1a43..54c4ce8 100644 --- a/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java +++ b/src/main/java/com/example/concurrency/features/countdownlatch/CountDownLatchEx.java @@ -4,6 +4,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * 描述: @@ -15,21 +17,21 @@ public class CountDownLatchEx { public static void main(String[] args) throws InterruptedException{ // 创建 2 个线程的线程池 - Executor executor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build(); + ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool().setPoolSize(2).build(); // 计数器初始化为 2 CountDownLatch latch = new CountDownLatch(2); executor.execute(()-> { - System.out.println("T1"); + System.out.println("T1, " + Thread.currentThread().getName()); latch.countDown(); }); executor.execute(()-> { - System.out.println("T2"); + System.out.println("T2, " + Thread.currentThread().getName()); latch.countDown(); }); // 等待两个查询操作结束 - latch.await(); - - + latch.await(1, TimeUnit.SECONDS); + System.out.println("T3, " + Thread.currentThread().getName()); + executor.shutdown(); } } diff --git a/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java new file mode 100644 index 0000000..86fff1e --- /dev/null +++ b/src/main/java/com/example/concurrency/features/cyclicbarrier/CyclicBarrierGeekTime.java @@ -0,0 +1,58 @@ +package com.example.concurrency.features.cyclicbarrier; + +import com.example.concurrency.features.threadPool.ThreadPoolBuilder; +import java.util.concurrent.*; + +public class CyclicBarrierGeekTime { + //订单队列 + public static ArrayBlockingQueue pos = new ArrayBlockingQueue(10); + //物流队列 + public static ArrayBlockingQueue dos = new ArrayBlockingQueue(10); + //这里线程设为1,防止多线程并发导致的数据不一致,因为订单和物流是两个队列 + private static ThreadPoolExecutor threadPoolExecutor = ThreadPoolBuilder.fixedPool().setPoolSize(1).setThreadNamePrefix("测试线程同步").build(); + public static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{ + //这一步用个线程池是因为要进行异步操作 + threadPoolExecutor.execute(CyclicBarrierGeekTime::check); + }); + static void check(){ + System.out.println("对账结果"+ (pos.poll() + "\t" + dos.poll()) +","+Thread.currentThread().getName()); + } + + public static void main(String[] args) throws InterruptedException { + //循环查询订单 + Thread t1 = new Thread(()->{ + for(int i = 0;i<4;i++){ + System.out.println("订单ok,"+Thread.currentThread().getName()); + pos.offer("订单"+i); + try { + //仅仅方便观察输出 + Thread.sleep(1000); + //所有阻塞式操作均用超时的方式调用 + cyclicBarrier.await(1,TimeUnit.SECONDS); + } catch (Exception e) { + System.out.println("等超时了"+e.getMessage()); + } + } + + }); + //循环查询物流库 + Thread t2 = new Thread(()->{ + //这里设置比订单的循环少,以让订单的cyclicBarrier超时 + for(int j=0;j<2;j++){ + System.out.println("物流ok,"+Thread.currentThread().getName()); + dos.offer("物流"+j); + try { + Thread.sleep(1000); + cyclicBarrier.await(1,TimeUnit.SECONDS); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + }); + t1.start(); + t2.start(); + Thread.sleep(20000); + threadPoolExecutor.shutdown(); + } +}