diff --git a/README.md b/README.md index 37a80cb..dd55bb0 100644 --- a/README.md +++ b/README.md @@ -1 +1,13 @@ # java-concurrency + +an attempt to implement and test java concurrency utils: + +various locks(read/write, re-entrant), + +semaphores, + +countdown-latch, + +ConcurrentHashmap, + +Callable, Future diff --git a/java-concurrency.iml b/java-concurrency.iml new file mode 100644 index 0000000..78b2cc5 --- /dev/null +++ b/java-concurrency.iml @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..86736ec --- /dev/null +++ b/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + org.example + java-concurrency + 1.0-SNAPSHOT + + + + org.junit.jupiter + junit-jupiter-engine + 5.5.2 + test + + + org.junit.platform + junit-platform-runner + 1.5.2 + test + + + org.mockito + mockito-all + 1.8.4 + test + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + 1.8 + 1.8 + + + + + + \ No newline at end of file diff --git a/src/main/java/barrier/CyclicBarrier.java b/src/main/java/barrier/CyclicBarrier.java new file mode 100644 index 0000000..d58f3b4 --- /dev/null +++ b/src/main/java/barrier/CyclicBarrier.java @@ -0,0 +1,4 @@ +package barrier; + +public class CyclicBarrier { +} diff --git a/src/main/java/barrier/FakeCountDownLatch.java b/src/main/java/barrier/FakeCountDownLatch.java new file mode 100644 index 0000000..73400bf --- /dev/null +++ b/src/main/java/barrier/FakeCountDownLatch.java @@ -0,0 +1,30 @@ +package barrier; + +public class FakeCountDownLatch { + private int count; + + public FakeCountDownLatch(int count){ + this.count = count; + } + + + public synchronized void countDown(){ + count--; + if (count == 0) { + this.notifyAll(); + } + } + + + public int getCount(){ + return this.count; + } + + + public synchronized void await() throws InterruptedException { + while (count > 0) { + wait(); + } + } + +} diff --git a/src/main/java/callableFuture/Callable.java b/src/main/java/callableFuture/Callable.java new file mode 100644 index 0000000..936b9d4 --- /dev/null +++ b/src/main/java/callableFuture/Callable.java @@ -0,0 +1,31 @@ + +package callableFuture; + +public abstract class Callable extends Thread{ + Future future; + private V value; + + @Override + public void run(){ + try { + value = call(); + completed(value); + } catch (Exception e) { + System.out.println("Exception in call method"); + e.printStackTrace(); + } + } + + protected abstract V call(); + + + public Future execute(){ + this.start(); + this.future = new Future(); + return future; + } + + private void completed(V value){ + future.completed(value); + } +} diff --git a/src/main/java/callableFuture/Future.java b/src/main/java/callableFuture/Future.java new file mode 100644 index 0000000..32b78ef --- /dev/null +++ b/src/main/java/callableFuture/Future.java @@ -0,0 +1,19 @@ +package callableFuture; + +public class Future { + private boolean completed; + private V value; + + public synchronized V getValue() throws InterruptedException { + while (!completed){ + wait(); + } + return value; + } + + protected synchronized void completed(V value){ + completed = true; + this.value = value; + notifyAll(); + } +} diff --git a/src/main/java/dataStructures/FakeHashmap.java b/src/main/java/dataStructures/FakeHashmap.java new file mode 100644 index 0000000..90683b9 --- /dev/null +++ b/src/main/java/dataStructures/FakeHashmap.java @@ -0,0 +1,227 @@ +package dataStructures; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class FakeHashmap { + + private int bucketCount = 16; + private Bucket[] buckets = new Bucket[bucketCount]; + + public void insert(K key, V val){ + Node node = new Node<>(key, val); + Bucket bucket; + int ind = getIndex(key); + if(buckets[ind] == null){ + bucket = new Bucket<>(); + buckets[ind] = bucket; + }else { + bucket = buckets[ind]; + } + bucket.updateNode(node); + } + + public V get(K key){ + int ind = getIndex(key); + if(buckets[ind] == null){ + return null; + } + Node node = buckets[ind].getNode(key); + if(node == null){ + return null; + } + return node.val; + } + + + public boolean replaceIfExists(K key, V oldVal, V newVal){ + int ind = getIndex(key); + if(buckets[ind] == null){ + return false; + } + return buckets[ind].replaceIfExist(key, oldVal, newVal); + } + + + + public V delete(K key){ + int ind = getIndex(key); + if(buckets[ind] == null){ + return null; + } + Node node = buckets[ind].delNode(key); + if(node == null){ + return null; + } + return node.val; + } + + public boolean insertIfAbsent(K key, V val){ + int ind = getIndex(key); + if(buckets[ind] == null){ + return false; + } + return buckets[ind].insertIfAbsent(key, val); + } + + private int getIndex(K key){ + int hash = key.hashCode(); + return hash%bucketCount; + } + + public Set entrySet(){ + Bucket bucket; + Set outputSet = new HashSet<>(); + for (int i=0 ; i> set = bucket.getAll(); + outputSet.addAll(set); + } + } + return outputSet; + } + +} + +class Bucket{ + private Node start; + private Node end; + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private Lock rLock = lock.readLock(); + private Lock wLock = lock.writeLock(); + + /* + * adds new node if key is not present + * updated value of node if key is present + * */ + void updateNode(Node node){ + wLock.lock(); + try { + if (start == null) { + start = node; + end = node; + return; + } + Node oldNode = getNode(node.key); + if (oldNode == null) { + end.next = node; + node.prev = end; + end = node; + return; + } + oldNode.val = node.val; + }finally { + wLock.unlock(); + } + } + + boolean replaceIfExist(K key, V oldVal, V newVal){ + rLock.lock(); + try { + Node node = start; + while (node != null) { + if (node.key.equals(key) && node.val.equals(oldVal)){ + node.val = newVal; + return true; + } + node = node.next; + } + return false; + }finally { + rLock.unlock(); + } + } + + + boolean insertIfAbsent(K key, V val){ + wLock.lock(); + try { + Node node = start; + boolean found = false; + while (node != null) { + if (node.key.equals(key)) { + found = true; + } + node = node.next; + } + if(!found){ + Node newNode = new Node<>(key, val); + if(start == null){ + start = newNode; + end = newNode; + }else { + end.next = newNode; + newNode.prev = end; + end = newNode; + } + } + return !found; + }finally { + wLock.unlock(); + } + } + + /* + * returns node if node.key.equals(key) + * else returns null + * */ + Node getNode(K key){ + rLock.lock(); + try { + Node node = start; + while (node != null) { + if (node.key.equals(key)) { + return node; + } + node = node.next; + } + return null; + }finally { + rLock.unlock(); + } + } + + + Set> getAll(){ + Set> set = new HashSet<>(); + rLock.lock(); + try { + Node node = start; + while (node != null){ + set.add(node); + node = node.next; + } + return set; + }finally { + rLock.unlock(); + } + } + + + Node delNode(K key){ + wLock.lock(); + try { + Node node = getNode(key); + if (node != null) { + if (start == node) { + start = node.next; + } + if (end == node) { + end = node.prev; + } + Node nxtNode = node.next; + Node prevNode = node.prev; + if (prevNode != null) { + prevNode.next = nxtNode; + } + } + return node; + }finally { + wLock.unlock(); + } + } +} diff --git a/src/main/java/dataStructures/Node.java b/src/main/java/dataStructures/Node.java new file mode 100644 index 0000000..d5db8fe --- /dev/null +++ b/src/main/java/dataStructures/Node.java @@ -0,0 +1,14 @@ +package dataStructures; + +class Node{ + Node next; + Node prev; + K key; + V val; + Node(K key, V val){ + this.val = val; + this.key = key; + next = null; + prev = null; + } +} \ No newline at end of file diff --git a/src/main/java/lock/FairLock.java b/src/main/java/lock/FairLock.java new file mode 100644 index 0000000..f0c81e5 --- /dev/null +++ b/src/main/java/lock/FairLock.java @@ -0,0 +1,6 @@ +package lock; + +public class FairLock { + + +} diff --git a/src/main/java/lock/Locks.md b/src/main/java/lock/Locks.md new file mode 100644 index 0000000..e69de29 diff --git a/src/main/java/lock/ReEntrantLock.java b/src/main/java/lock/ReEntrantLock.java new file mode 100644 index 0000000..0ce40bc --- /dev/null +++ b/src/main/java/lock/ReEntrantLock.java @@ -0,0 +1,32 @@ +package lock; + +public class ReEntrantLock { + + private boolean isLocked = false; + private Thread lockedBy = null; + private int count = 0; + + synchronized public void lock() throws InterruptedException { + Thread currentThread = Thread.currentThread(); + while (isLocked && lockedBy != currentThread){ + wait(); + } + isLocked = true; + count++; + lockedBy = currentThread; + } + + public synchronized void unlock(){ + if(Thread.currentThread() == lockedBy) { + count--; + } + if(count == 0) { + isLocked = false; + notifyAll(); + } + } + + public boolean isLocked(){ + return isLocked; + } +} diff --git a/src/main/java/lock/SimpleLock.java b/src/main/java/lock/SimpleLock.java new file mode 100644 index 0000000..26ec809 --- /dev/null +++ b/src/main/java/lock/SimpleLock.java @@ -0,0 +1,34 @@ +package lock; + +public class SimpleLock { + + private boolean isLocked = false; + private Thread lockedBy = null; + + synchronized public void lock() throws InterruptedException { +// this is not a spin lock want ot demonstrate that +// if statement will not work +// check testSpuriousWakeup in TestSimpleLock class + if (isLocked) { + wait(); + } + isLocked = true; + lockedBy = Thread.currentThread(); + } + + public synchronized void unlock(){ + if(Thread.currentThread() == lockedBy) { + isLocked = false; + notifyAll(); + } + } + + public synchronized void notifyAllThread(){ + notifyAll(); + } + + public boolean isLocked(){ + return isLocked; + } + +} diff --git a/src/main/java/lock/SpinLock.java b/src/main/java/lock/SpinLock.java new file mode 100644 index 0000000..1001257 --- /dev/null +++ b/src/main/java/lock/SpinLock.java @@ -0,0 +1,30 @@ +package lock; + +public class SpinLock { + + private boolean isLocked = false; + private Thread lockedBy = null; + + synchronized public void lock() throws InterruptedException { + while (isLocked){ + wait(); + } + isLocked = true; + lockedBy = Thread.currentThread(); + } + + public synchronized void unlock(){ + if(Thread.currentThread() == lockedBy) { + isLocked = false; + notifyAll(); + } + } + + public synchronized void notifyAllThread(){ + notifyAll(); + } + + public boolean isLocked(){ + return isLocked; + } +} diff --git a/src/test/java/barrier/TestFakeCountDownLatch.java b/src/test/java/barrier/TestFakeCountDownLatch.java new file mode 100644 index 0000000..0c12f01 --- /dev/null +++ b/src/test/java/barrier/TestFakeCountDownLatch.java @@ -0,0 +1,92 @@ +package barrier; + +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class TestFakeCountDownLatch { + + @Test + public void singleThreadTest(){ + int threadCount = 10; + FakeCountDownLatch latchSingleThread = new FakeCountDownLatch(threadCount); + Assertions.assertEquals(threadCount, latchSingleThread.getCount()); + latchSingleThread.countDown(); + latchSingleThread.countDown(); + Assertions.assertEquals(threadCount-2, latchSingleThread.getCount()); + } + + @Test + public void testCountDown() throws InterruptedException { + int threadCount = 10, diff = 4; + + FakeCountDownLatch latch = new FakeCountDownLatch(threadCount); + CountDownLatch realLatch = new CountDownLatch(threadCount); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + service.submit(() -> { + try { + int rand = new Random().nextInt(); + if(rand < 0){ + rand = rand*-1; + } + rand = rand%1000; + Thread.sleep(rand); + } catch (InterruptedException e) { + e.printStackTrace(); + } + latch.countDown(); + realLatch.countDown(); + }); + } + realLatch.await(); + Assertions.assertEquals(0, latch.getCount()); + } + + + @Test + public void testAwait() throws InterruptedException { + FakeCountDownLatch latch = new FakeCountDownLatch(3); + Thread t1 = new Thread(() -> { + try { + Thread.sleep(1000); + latch.countDown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + Thread t2 = new Thread(() -> { + try { + Thread.sleep(1000); + latch.countDown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + Thread t3 = new Thread(() -> { + try { + Thread.sleep(1000); + latch.countDown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + t1.start(); + t2.start(); + t3.start(); + + latch.await(); + Assertions.assertEquals(0, latch.getCount()); + Assertions.assertEquals(Thread.State.TERMINATED, t1.getState()); + Assertions.assertEquals(Thread.State.TERMINATED, t2.getState()); + Assertions.assertEquals(Thread.State.TERMINATED, t3.getState()); + + } + + +} diff --git a/src/test/java/callableFuture/CallableFutureTest.java b/src/test/java/callableFuture/CallableFutureTest.java new file mode 100644 index 0000000..5747ea1 --- /dev/null +++ b/src/test/java/callableFuture/CallableFutureTest.java @@ -0,0 +1,33 @@ +package callableFuture; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class CallableFutureTest { + + @Test + public void testGetValue() throws InterruptedException { + int value = 100; + CallableTest callableTest = new CallableTest(value); + Future callableInt = callableTest.execute(); + + Assertions.assertEquals(value, callableInt.getValue()); + } +} + +class CallableTest extends Callable { + private int value; + public CallableTest(int value){ + this.value = value; + } + + @Override + protected Integer call() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return value; + } +} \ No newline at end of file diff --git a/src/test/java/dataStructures/HashmapTest.java b/src/test/java/dataStructures/HashmapTest.java new file mode 100644 index 0000000..a360185 --- /dev/null +++ b/src/test/java/dataStructures/HashmapTest.java @@ -0,0 +1,22 @@ +package dataStructures; + +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class HashmapTest { + + @Test + public void singleThread(){ + + Hashmap map = new Hashmap(); + map.insert(1,2); + map.insert(2,32); + map.insert(3,5); + map.insert(7,53); + Assertions.assertEquals(2, map.get(1)); + Assertions.assertEquals(32, map.get(2)); + Assertions.assertEquals(null, map.get(234)); + map.delete(2); + Assertions.assertEquals(null, map.get(2)); + } +} \ No newline at end of file diff --git a/src/test/java/lock/TestSimpleLock.java b/src/test/java/lock/TestSimpleLock.java new file mode 100644 index 0000000..acbc62d --- /dev/null +++ b/src/test/java/lock/TestSimpleLock.java @@ -0,0 +1,123 @@ +package lock; + +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class TestSimpleLock { + + @Test + public void singleThreadLockUnlockTest() throws InterruptedException { + SimpleLock simpleLock = new SimpleLock(); + Assertions.assertFalse(simpleLock.isLocked(), "initially lock object should not be locked"); + + simpleLock.lock(); + Assertions.assertTrue(simpleLock.isLocked(), "after calling lock() method it has to be locked"); + + simpleLock.unlock(); + Assertions.assertFalse(simpleLock.isLocked(), "after unlock state has to be false"); + } + + @Test + public void multiThreadedTestLock() throws InterruptedException { + SimpleLock simpleLock = new SimpleLock(); + Thread t1 = new Thread(() -> { + try { + simpleLock.lock(); + Assertions.assertTrue(simpleLock.isLocked(), "after calling lock() method it has to be locked"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + Thread t2 = new Thread(() -> { + try { + Assertions.assertTrue(simpleLock.isLocked(), "after calling lock() method it has to be locked"); + simpleLock.lock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + // thread t1,t2 try to acquire lock, t1 first next t2 + // after t1 acquires lock t2 cannot acquire lock and its in waiting state + t1.start(); + Thread.sleep(1000); + Assertions.assertTrue(simpleLock.isLocked(), "after calling lock() method it has to be locked"); + t2.start(); + Thread.sleep(1000); + Assertions.assertEquals(Thread.State.WAITING, t2.getState()); + + } + + @Test + public void multiThreadedTestUnLock() throws InterruptedException { + SimpleLock simpleLock = new SimpleLock(); + Thread t1 = new Thread(() -> { + try { + simpleLock.lock(); + Assertions.assertTrue(simpleLock.isLocked(), "after calling lock() method it has to be locked"); + simpleLock.unlock(); + Assertions.assertFalse(simpleLock.isLocked(), "after unlock state has to be false"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + Thread t2 = new Thread(() -> { + try { + simpleLock.lock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + // thread t1,t2 try to acquire lock, t1 first next t2 + // after t1 acquires lock t2 cannot acquire lock and its in waiting state + t1.start(); + Thread.sleep(1000); + Assertions.assertFalse(simpleLock.isLocked(), "after unlock state has to be false"); + t2.start(); + Thread.sleep(1000); + + Assertions.assertEquals(t2.getState(), Thread.State.TERMINATED); + + } + + @Test + public void testSpuriousWakeup() throws InterruptedException { + SimpleLock simpleLock = new SimpleLock(); + Thread t1 = new Thread(() -> { + try { + simpleLock.lock(); + Assertions.assertTrue(simpleLock.isLocked(), "after calling lock() method it has to be locked"); + Thread.sleep(1000); + simpleLock.notifyAllThread(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + Thread t2 = new Thread(() -> { + try { + simpleLock.lock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + Thread t3 = new Thread(() -> { + try { + simpleLock.notifyAllThread(); + simpleLock.lock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + t1.start(); + t2.start(); + t3.start(); + Thread.sleep(3000); + Assertions.assertTrue(simpleLock.isLocked(), "after calling lock() method it has to be locked"); + Assertions.assertEquals(Thread.State.TERMINATED, t3.getState()); + Assertions.assertEquals(Thread.State.TERMINATED, t2.getState()); + } +} + + diff --git a/src/test/java/lock/TestSpinLock.java b/src/test/java/lock/TestSpinLock.java new file mode 100644 index 0000000..3d2f25b --- /dev/null +++ b/src/test/java/lock/TestSpinLock.java @@ -0,0 +1,124 @@ +package lock; + +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class TestSpinLock { + + @Test + public void singleThreadLockUnlockTest() throws InterruptedException { + SpinLock spinLock = new SpinLock(); + Assertions.assertFalse(spinLock.isLocked(), "initially lock object should not be locked"); + + spinLock.lock(); + Assertions.assertTrue(spinLock.isLocked(), "after calling lock() method it has to be locked"); + + spinLock.unlock(); + Assertions.assertFalse(spinLock.isLocked(), "after unlock state has to be false"); + } + + @Test + public void multiThreadedTestLock() throws InterruptedException { + SpinLock spinLock = new SpinLock(); + Thread t1 = new Thread(() -> { + try { + spinLock.lock(); + Assertions.assertTrue(spinLock.isLocked(), "after calling lock() method it has to be locked"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + Thread t2 = new Thread(() -> { + try { + Assertions.assertTrue(spinLock.isLocked(), "after calling lock() method it has to be locked"); + spinLock.lock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + // thread t1,t2 try to acquire lock, t1 first next t2 + // after t1 acquires lock t2 cannot acquire lock and its in waiting state + t1.start(); + Thread.sleep(1000); + Assertions.assertTrue(spinLock.isLocked(), "after calling lock() method it has to be locked"); + t2.start(); + Thread.sleep(1000); + Assertions.assertEquals(Thread.State.WAITING, t2.getState()); + + } + + @Test + public void multiThreadedTestUnLock() throws InterruptedException{ + SpinLock spinLock = new SpinLock(); + Thread t1 = new Thread(() -> { + try { + spinLock.lock(); + Assertions.assertTrue(spinLock.isLocked(), "after calling lock() method it has to be locked"); + spinLock.unlock(); + Assertions.assertFalse(spinLock.isLocked(), "after unlock state has to be false"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + Thread t2 = new Thread(() -> { + try { + spinLock.lock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + // thread t1,t2 try to acquire lock, t1 first next t2 + // after t1 acquires lock t2 cannot acquire lock and its in waiting state + t1.start(); + Thread.sleep(1000); + Assertions.assertFalse(spinLock.isLocked(), "after unlock state has to be false"); + t2.start(); + Thread.sleep(1000); + + Assertions.assertEquals(t2.getState(), Thread.State.TERMINATED); + + } + + + @Test + public void testSpuriousWakeup() throws InterruptedException { + SpinLock spinLock = new SpinLock(); + Thread t1 = new Thread(() -> { + try { + spinLock.lock(); + Assertions.assertTrue(spinLock.isLocked(), "after calling lock() method it has to be locked"); + Thread.sleep(1000); + spinLock.notifyAllThread(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + Thread t2 = new Thread(() -> { + try { + spinLock.lock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + Thread t3 = new Thread(() -> { + try { + spinLock.notifyAllThread(); + spinLock.lock(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + t1.start(); + t2.start(); + t3.start(); + Thread.sleep(3000); + Assertions.assertTrue(spinLock.isLocked(), "after calling lock() method it has to be locked"); + Assertions.assertEquals(Thread.State.WAITING, t3.getState()); + Assertions.assertEquals(Thread.State.WAITING, t2.getState()); + } + + +}