diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 2f78d8294..0d1ec8037 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -32,6 +32,13 @@ require 'concurrent/tvar' require 'concurrent/utilities' +require 'concurrent/channel/probe' +require 'concurrent/channel/channel' +require 'concurrent/channel/unbuffered_channel' +require 'concurrent/channel/buffered_channel' +require 'concurrent/channel/ring_buffer' +require 'concurrent/channel/blocking_ring_buffer' + require 'concurrent/actor_context' require 'concurrent/simple_actor_ref' diff --git a/lib/concurrent/channel/blocking_ring_buffer.rb b/lib/concurrent/channel/blocking_ring_buffer.rb new file mode 100644 index 000000000..856a9a3a1 --- /dev/null +++ b/lib/concurrent/channel/blocking_ring_buffer.rb @@ -0,0 +1,60 @@ +module Concurrent + class BlockingRingBuffer + + def initialize(capacity) + @buffer = RingBuffer.new(capacity) + @first = @last = 0 + @count = 0 + @mutex = Mutex.new + @condition = Condition.new + end + + def capacity + @mutex.synchronize { @buffer.capacity } + end + + def count + @mutex.synchronize { @buffer.count } + end + + def full? + @mutex.synchronize { @buffer.full? } + end + + def empty? + @mutex.synchronize { @buffer.empty? } + end + + def put(value) + @mutex.synchronize do + wait_while_full + @buffer.offer(value) + @condition.signal + end + end + + def take + @mutex.synchronize do + wait_while_empty + result = @buffer.poll + @condition.signal + result + end + end + + def peek + @mutex.synchronize { @buffer.peek } + end + + private + + def wait_while_full + @condition.wait(@mutex) while @buffer.full? + end + + def wait_while_empty + @condition.wait(@mutex) while @buffer.empty? + end + + end +end \ No newline at end of file diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb new file mode 100644 index 000000000..710154d4d --- /dev/null +++ b/lib/concurrent/channel/buffered_channel.rb @@ -0,0 +1,83 @@ +require_relative 'waitable_list' + +module Concurrent + class BufferedChannel + + def initialize(size) + @mutex = Mutex.new + @condition = Condition.new + @buffer_condition = Condition.new + + @probe_set = WaitableList.new + @buffer = RingBuffer.new(size) + end + + def probe_set_size + @probe_set.size + end + + def buffer_queue_size + @mutex.synchronize { @buffer.count } + end + + def push(value) + until set_probe_or_push_into_buffer(value) + end + end + + def pop + probe = Probe.new + select(probe) + probe.value + end + + def select(probe) + @mutex.synchronize do + + if @buffer.empty? + @probe_set.put(probe) + true + else + shift_buffer if probe.set_unless_assigned peek_buffer + end + + end + end + + def remove_probe(probe) + @probe_set.delete(probe) + end + + private + + def push_into_buffer(value) + @buffer_condition.wait(@mutex) while @buffer.full? + @buffer.offer value + @buffer_condition.broadcast + end + + def peek_buffer + @buffer_condition.wait(@mutex) while @buffer.empty? + @buffer.peek + end + + def shift_buffer + @buffer_condition.wait(@mutex) while @buffer.empty? + result = @buffer.poll + @buffer_condition.broadcast + result + end + + def set_probe_or_push_into_buffer(value) + @mutex.synchronize do + if @probe_set.empty? + push_into_buffer(value) + true + else + @probe_set.take.set_unless_assigned(value) + end + end + end + + end +end \ No newline at end of file diff --git a/lib/concurrent/channel/channel.rb b/lib/concurrent/channel/channel.rb new file mode 100644 index 000000000..4f0bf2629 --- /dev/null +++ b/lib/concurrent/channel/channel.rb @@ -0,0 +1,11 @@ +module Concurrent + class Channel + def self.select(*channels) + probe = Probe.new + channels.each { |channel| channel.select(probe) } + result = probe.value + channels.each { |channel| channel.remove_probe(probe) } + result + end + end +end \ No newline at end of file diff --git a/lib/concurrent/channel/probe.rb b/lib/concurrent/channel/probe.rb new file mode 100644 index 000000000..969ab54f9 --- /dev/null +++ b/lib/concurrent/channel/probe.rb @@ -0,0 +1,19 @@ +module Concurrent + class Probe < IVar + + def initialize(value = NO_VALUE, opts = {}) + super(value, opts) + end + + def set_unless_assigned(value) + mutex.synchronize do + return false if [:fulfilled, :rejected].include? @state + + set_state(true, value, nil) + event.set + true + end + + end + end +end \ No newline at end of file diff --git a/lib/concurrent/channel/ring_buffer.rb b/lib/concurrent/channel/ring_buffer.rb new file mode 100644 index 000000000..761fd78a6 --- /dev/null +++ b/lib/concurrent/channel/ring_buffer.rb @@ -0,0 +1,54 @@ +module Concurrent + + # not thread safe buffer + class RingBuffer + + def initialize(capacity) + @buffer = Array.new(capacity) + @first = @last = 0 + @count = 0 + end + + def capacity + @buffer.size + end + + def count + @count + end + + def empty? + @count == 0 + end + + def full? + @count == capacity + end + + # @param [Object] value + # @return [Boolean] true if value has been inserted, false otherwise + def offer(value) + return false if full? + + @buffer[@last] = value + @last = (@last + 1) % @buffer.size + @count += 1 + true + end + + # @return [Object] the first available value and removes it from the buffer. If buffer is empty returns nil + def poll + result = @buffer[@first] + @buffer[@first] = nil + @first = (@first + 1) % @buffer.size + @count -= 1 + result + end + + # @return [Object] the first available value and without removing it from the buffer. If buffer is empty returns nil + def peek + @buffer[@first] + end + + end +end \ No newline at end of file diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb new file mode 100644 index 000000000..453398f96 --- /dev/null +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -0,0 +1,34 @@ +require_relative 'waitable_list' + +module Concurrent + class UnbufferedChannel + + def initialize + @probe_set = WaitableList.new + end + + def probe_set_size + @probe_set.size + end + + def push(value) + until @probe_set.take.set_unless_assigned(value) + end + end + + def pop + probe = Probe.new + select(probe) + probe.value + end + + def select(probe) + @probe_set.put(probe) + end + + def remove_probe(probe) + @probe_set.delete(probe) + end + + end +end \ No newline at end of file diff --git a/lib/concurrent/channel/waitable_list.rb b/lib/concurrent/channel/waitable_list.rb new file mode 100644 index 000000000..d96521a73 --- /dev/null +++ b/lib/concurrent/channel/waitable_list.rb @@ -0,0 +1,38 @@ +module Concurrent + class WaitableList + + def initialize + @mutex = Mutex.new + @condition = Condition.new + + @list = [] + end + + def size + @mutex.synchronize { @list.size } + end + + def empty? + @mutex.synchronize { @list.empty? } + end + + def put(value) + @mutex.synchronize do + @list << value + @condition.signal + end + end + + def delete(value) + @mutex.synchronize { @list.delete(value) } + end + + def take + @mutex.synchronize do + @condition.wait(@mutex) while @list.empty? + @list.shift + end + end + + end +end \ No newline at end of file diff --git a/spec/concurrent/channel/blocking_ring_buffer_spec.rb b/spec/concurrent/channel/blocking_ring_buffer_spec.rb new file mode 100644 index 000000000..cf4bedcdb --- /dev/null +++ b/spec/concurrent/channel/blocking_ring_buffer_spec.rb @@ -0,0 +1,149 @@ +require 'spec_helper' + +module Concurrent + + describe BlockingRingBuffer do + + let(:capacity) { 3 } + let(:buffer) { BlockingRingBuffer.new(capacity) } + + def fill_buffer + capacity.times { buffer.put 3 } + end + + describe '#capacity' do + it 'returns the value passed in constructor' do + buffer.capacity.should eq capacity + end + end + + describe '#count' do + it 'is zero when created' do + buffer.count.should eq 0 + end + + it 'increases when an element is added' do + buffer.put 5 + buffer.count.should eq 1 + + buffer.put 1 + buffer.count.should eq 2 + end + + it 'decreases when an element is removed' do + buffer.put 10 + + buffer.take + + buffer.count.should eq 0 + end + end + + describe '#empty?' do + it 'is true when count is zero' do + buffer.empty?.should be_true + end + + it 'is false when count is not zero' do + buffer.put 82 + buffer.empty?.should be_false + end + end + + describe '#full?' do + it 'is true when count is capacity' do + fill_buffer + buffer.full?.should be_true + end + + it 'is false when count is not capacity' do + buffer.full?.should be_false + end + end + + describe '#put' do + it 'block when buffer is full' do + fill_buffer + + t = Thread.new { buffer.put 32 } + + sleep(0.1) + + t.status.should eq 'sleep' + end + + it 'continues when an element is removed' do + latch = CountDownLatch.new(1) + + Thread.new { (capacity + 1).times { buffer.put 'hi' }; latch.count_down } + Thread.new { sleep(0.1); buffer.take } + + latch.wait(0.2).should be_true + end + end + + describe '#take' do + it 'blocks when buffer is empty' do + t = Thread.new { buffer.take } + + sleep(0.1) + + t.status.should eq 'sleep' + end + + it 'continues when an element is added' do + latch = CountDownLatch.new(1) + + Thread.new { buffer.take; latch.count_down } + Thread.new { sleep(0.1); buffer.put 3 } + + latch.wait(0.2).should be_true + end + + it 'returns the first added value' do + buffer.put 'hi' + buffer.put 'foo' + buffer.put 'bar' + + buffer.take.should eq 'hi' + buffer.take.should eq 'foo' + buffer.take.should eq 'bar' + end + end + + describe '#peek' do + context 'buffer empty' do + it 'returns nil when buffer is empty' do + buffer.peek.should be_nil + end + end + + context 'not empty' do + + before(:each) { buffer.put 'element' } + + it 'returns the first value' do + buffer.peek.should eq 'element' + end + + it 'does not change buffer' do + buffer.peek + buffer.count.should eq 1 + end + end + end + + context 'circular condition' do + it 'can filled many times' do + fill_buffer + capacity.times { buffer.take } + + buffer.put 'hi' + + buffer.take.should eq 'hi' + buffer.capacity.should eq capacity + end + end + + end +end diff --git a/spec/concurrent/channel/buffered_channel_spec.rb b/spec/concurrent/channel/buffered_channel_spec.rb new file mode 100644 index 000000000..5db769962 --- /dev/null +++ b/spec/concurrent/channel/buffered_channel_spec.rb @@ -0,0 +1,151 @@ +require 'spec_helper' + +module Concurrent + + describe BufferedChannel do + + let(:size) { 2 } + let!(:channel) { BufferedChannel.new(size) } + let(:probe) { Probe.new } + + context 'without timeout' do + + describe '#push' do + it 'adds elements to buffer' do + channel.buffer_queue_size.should be 0 + + channel.push('a') + channel.push('a') + + channel.buffer_queue_size.should be 2 + end + + it 'should block when buffer is full' do + channel.push 1 + channel.push 2 + + t = Thread.new { channel.push 3 } + sleep(0.05) + t.status.should eq 'sleep' + end + + it 'restarts thread when buffer is no more full' do + channel.push 'hi' + channel.push 'foo' + + result = nil + + Thread.new { channel.push 'bar'; result = 42 } + + sleep(0.1) + + channel.pop + + sleep(0.1) + + result.should eq 42 + end + + it 'should assign value to a probe if probe set is not empty' do + channel.select(probe) + Thread.new { sleep(0.1); channel.push 3 } + probe.value.should eq 3 + end + end + + describe '#pop' do + it 'should block if buffer is empty' do + t = Thread.new { channel.pop } + sleep(0.05) + t.status.should eq 'sleep' + end + + it 'returns value if buffer is not empty' do + channel.push 1 + result = channel.pop + + result.should eq 1 + end + + it 'removes the first value from the buffer' do + channel.push 'a' + channel.push 'b' + + channel.pop.should eq 'a' + channel.buffer_queue_size.should eq 1 + end + end + + end + + describe 'select' do + + it 'does not block' do + t = Thread.new { channel.select(probe) } + + sleep(0.05) + + t.status.should eq false + end + + it 'gets notified by writer thread' do + channel.select(probe) + + Thread.new { channel.push 82 } + + probe.value.should eq 82 + end + + end + + context 'already set probes' do + context 'empty buffer' do + it 'discards already set probes' do + probe.set('set value') + + channel.select(probe) + + channel.push 27 + + channel.buffer_queue_size.should eq 1 + channel.probe_set_size.should eq 0 + end + end + + context 'empty probe set' do + it 'discards set probe' do + probe.set('set value') + + channel.push 82 + + channel.select(probe) + + channel.buffer_queue_size.should eq 1 + + channel.pop.should eq 82 + + end + end + end + + describe 'probe set' do + + it 'has size zero after creation' do + channel.probe_set_size.should eq 0 + end + + it 'increases size after a select' do + channel.select(probe) + channel.probe_set_size.should eq 1 + end + + it 'decreases size after a removal' do + channel.select(probe) + channel.remove_probe(probe) + channel.probe_set_size.should eq 0 + end + + end + + end +end diff --git a/spec/concurrent/channel/channel_spec.rb b/spec/concurrent/channel/channel_spec.rb new file mode 100644 index 000000000..51cfe3553 --- /dev/null +++ b/spec/concurrent/channel/channel_spec.rb @@ -0,0 +1,37 @@ +require 'spec_helper' + +module Concurrent + + describe Channel do + + describe '.select' do + + context 'without timeout' do + it 'returns the first value available on a channel' do + channels = [ UnbufferedChannel.new, UnbufferedChannel.new] + + Thread.new { channels[1].push 77 } + + value = Channel.select(*channels) + + value.should eq 77 + end + + it 'cleans up' do + channels = [ UnbufferedChannel.new, UnbufferedChannel.new] + channels.each { |ch| ch.stub(:remove_probe).with( an_instance_of(Probe) )} + + Thread.new { channels[1].push 77 } + + value = Channel.select(*channels) + + value.should eq 77 + + channels.each { |ch| expect(ch).to have_received(:remove_probe).with( an_instance_of(Probe) ) } + end + end + + end + + end +end diff --git a/spec/concurrent/channel/probe_spec.rb b/spec/concurrent/channel/probe_spec.rb new file mode 100644 index 000000000..ab0a83501 --- /dev/null +++ b/spec/concurrent/channel/probe_spec.rb @@ -0,0 +1,49 @@ +require 'spec_helper' + +module Concurrent + + describe Probe do + + let(:probe) { Probe.new } + + describe '#set_unless_assigned' do + context 'empty probe' do + it 'assigns the value' do + probe.set_unless_assigned(32) + probe.value.should eq 32 + end + + it 'returns true' do + probe.set_unless_assigned('hi').should eq true + end + end + + context 'fulfilled probe' do + before(:each) { probe.set(27) } + + it 'does not assign the value' do + probe.set_unless_assigned(88) + probe.value.should eq 27 + end + + it 'returns false' do + probe.set_unless_assigned('hello').should eq false + end + end + + context 'rejected probe' do + before(:each) { probe.fail } + + it 'does not assign the value' do + probe.set_unless_assigned(88) + probe.should be_rejected + end + + it 'returns false' do + probe.set_unless_assigned('hello').should eq false + end + end + end + + end +end diff --git a/spec/concurrent/channel/ring_buffer_spec.rb b/spec/concurrent/channel/ring_buffer_spec.rb new file mode 100644 index 000000000..77a007b5e --- /dev/null +++ b/spec/concurrent/channel/ring_buffer_spec.rb @@ -0,0 +1,126 @@ +require 'spec_helper' + +module Concurrent + + describe RingBuffer do + + let(:capacity) { 3 } + let(:buffer) { RingBuffer.new(capacity) } + + def fill_buffer + capacity.times { buffer.offer 3 } + end + + describe '#capacity' do + it 'returns the value passed in constructor' do + buffer.capacity.should eq capacity + end + end + + describe '#count' do + it 'is zero when created' do + buffer.count.should eq 0 + end + + it 'increases when an element is added' do + buffer.offer 5 + buffer.count.should eq 1 + + buffer.offer 1 + buffer.count.should eq 2 + end + + it 'decreases when an element is removed' do + buffer.offer 10 + buffer.poll + + buffer.count.should eq 0 + end + end + + describe '#empty?' do + it 'is true when count is zero' do + buffer.empty?.should be_true + end + + it 'is false when count is not zero' do + buffer.offer 82 + buffer.empty?.should be_false + end + end + + describe '#full?' do + it 'is true when count is capacity' do + fill_buffer + buffer.full?.should be_true + end + + it 'is false when count is not capacity' do + buffer.full?.should be_false + end + end + + describe '#offer' do + it 'returns false when buffer is full' do + fill_buffer + buffer.offer(3).should be_false + end + + it 'returns true when the buffer is not full' do + buffer.offer(5).should be_true + end + + end + + describe '#poll' do + it 'returns the first added value' do + buffer.offer 'hi' + buffer.offer 'foo' + buffer.offer 'bar' + + buffer.poll.should eq 'hi' + buffer.poll.should eq 'foo' + buffer.poll.should eq 'bar' + end + + it 'returns nil when buffer is empty' do + buffer.poll.should be_nil + end + end + + describe '#peek' do + context 'buffer empty' do + it 'returns nil when buffer is empty' do + buffer.peek.should be_nil + end + end + + context 'not empty' do + + before(:each) { buffer.offer 'element' } + + it 'returns the first value' do + buffer.peek.should eq 'element' + end + + it 'does not change buffer' do + buffer.peek + buffer.count.should eq 1 + end + end + end + + context 'circular condition' do + it 'can filled many times' do + fill_buffer + capacity.times { buffer.poll } + + buffer.offer 'hi' + + buffer.poll.should eq 'hi' + buffer.capacity.should eq capacity + end + end + + end +end diff --git a/spec/concurrent/channel/unbuffered_channel_spec.rb b/spec/concurrent/channel/unbuffered_channel_spec.rb new file mode 100644 index 000000000..a63258c2c --- /dev/null +++ b/spec/concurrent/channel/unbuffered_channel_spec.rb @@ -0,0 +1,132 @@ +require 'spec_helper' + +module Concurrent + + describe UnbufferedChannel do + + let!(:channel) { subject } + let(:probe) { Probe.new } + + context 'with one thread' do + + context 'without timeout' do + + describe '#push' do + it 'should block' do + t = Thread.new { channel.push 5 } + sleep(0.05) + t.status.should eq 'sleep' + end + end + + describe '#pop' do + it 'should block' do + t = Thread.new { channel.pop } + sleep(0.05) + t.status.should eq 'sleep' + end + end + + end + + end + + context 'cooperating threads' do + + it 'passes the pushed value to thread waiting on pop' do + result = nil + + Thread.new { channel.push 42 } + Thread.new { result = channel.pop; } + + sleep(0.1) + + result.should eq 42 + end + + it 'passes the pushed value to only one thread' do + result = [] + + Thread.new { channel.push 37 } + Thread.new { result << channel.pop } + Thread.new { result << channel.pop } + + sleep(0.1) + + result.should have(1).items + end + + it 'gets the pushed value when ready' do + result = nil + + Thread.new { result = channel.pop; } + Thread.new { channel.push 57 } + + sleep(0.1) + + result.should eq 57 + end + end + + describe 'select' do + + it 'does not block' do + t = Thread.new { channel.select(probe) } + + sleep(0.05) + + t.status.should eq false + end + + it 'gets notified by writer thread' do + channel.select(probe) + + Thread.new { channel.push 82 } + + probe.value.should eq 82 + end + + it 'ignores already set probes and waits for a new one' do + probe.set(27) + + channel.select(probe) + + t = Thread.new { channel.push 72 } + + sleep(0.05) + + t.status.should eq 'sleep' + + new_probe = Probe.new + + channel.select(new_probe) + + sleep(0.05) + + new_probe.value.should eq 72 + end + + end + + describe 'probe set' do + + it 'has size zero after creation' do + channel.probe_set_size.should eq 0 + end + + it 'increases size after a select' do + channel.select(probe) + channel.probe_set_size.should eq 1 + end + + it 'decreases size after a removal' do + channel.select(probe) + channel.remove_probe(probe) + channel.probe_set_size.should eq 0 + end + + end + + + end +end