From da59f26cfc66da0953b329646209bb6fe2d86815 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Tue, 1 Apr 2014 19:59:14 +0200 Subject: [PATCH 01/21] initial unbuffered channel implementation --- lib/concurrent.rb | 2 + lib/concurrent/channel/unbuffered_channel.rb | 38 +++++++++++++++ .../channel/unbuffered_channel_spec.rb | 47 +++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 lib/concurrent/channel/unbuffered_channel.rb create mode 100644 spec/concurrent/channel/unbuffered_channel_spec.rb diff --git a/lib/concurrent.rb b/lib/concurrent.rb index a90357313..49cf6d8cd 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -32,6 +32,8 @@ require 'concurrent/tvar' require 'concurrent/utilities' +require 'concurrent/channel/unbuffered_channel' + require 'concurrent/cached_thread_pool' require 'concurrent/fixed_thread_pool' require 'concurrent/immediate_executor' diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb new file mode 100644 index 000000000..804dc5453 --- /dev/null +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -0,0 +1,38 @@ +module Concurrent + class UnbufferedChannel + + def initialize + @mutex = Mutex.new + @condition = Condition.new + + @wait_set = [] + end + + def push(value) + probe = @mutex.synchronize do + @condition.wait(@mutex) while @wait_set.empty? + @wait_set.shift + end + + probe.set(value) + end + + def pop + probe = IVar.new + + @mutex.synchronize do + @wait_set << probe + @condition.signal + end + + probe.value + end + + def select(probe) + end + + def remove_probe(probe) + end + + end +end \ No newline at end of file diff --git a/spec/concurrent/channel/unbuffered_channel_spec.rb b/spec/concurrent/channel/unbuffered_channel_spec.rb new file mode 100644 index 000000000..fe5ff0415 --- /dev/null +++ b/spec/concurrent/channel/unbuffered_channel_spec.rb @@ -0,0 +1,47 @@ +require 'spec_helper' + +module Concurrent + + describe UnbufferedChannel do + + let!(:channel) { subject } # let is not thread safe, let! creates the object before ensuring uniqueness + + context 'with one thread' do + + context 'without timeout' do + + describe '#push' do + it 'should block' do + t = Thread.new { channel.push 5 } + sleep(0.05) + t.status.should eq 'sleep' + end + end + + describe '#pop' do + it 'should block' do + t = Thread.new { channel.pop } + sleep(0.05) + t.status.should eq 'sleep' + end + end + + end + + end + + context 'cooperating threads' do + it 'passes the pushed value to thread waiting on pop' do + result = nil + + Thread.new { channel.push 42 } + Thread.new { result = channel.pop } + + sleep(0.05) + + result.should eq 42 + end + end + + end +end From 6563469bae0a9ea814cd1d4e5e6264d2cd3cdf71 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Wed, 2 Apr 2014 08:32:22 +0200 Subject: [PATCH 02/21] added support for expired probes --- lib/concurrent.rb | 1 + lib/concurrent/channel/probe.rb | 19 +++++++ lib/concurrent/channel/unbuffered_channel.rb | 25 +++++---- spec/concurrent/channel/probe_spec.rb | 8 +++ .../channel/unbuffered_channel_spec.rb | 54 +++++++++++++++++++ 5 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 lib/concurrent/channel/probe.rb create mode 100644 spec/concurrent/channel/probe_spec.rb diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 49cf6d8cd..0c4493305 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -32,6 +32,7 @@ require 'concurrent/tvar' require 'concurrent/utilities' +require 'concurrent/channel/probe' require 'concurrent/channel/unbuffered_channel' require 'concurrent/cached_thread_pool' diff --git a/lib/concurrent/channel/probe.rb b/lib/concurrent/channel/probe.rb new file mode 100644 index 000000000..969ab54f9 --- /dev/null +++ b/lib/concurrent/channel/probe.rb @@ -0,0 +1,19 @@ +module Concurrent + class Probe < IVar + + def initialize(value = NO_VALUE, opts = {}) + super(value, opts) + end + + def set_unless_assigned(value) + mutex.synchronize do + return false if [:fulfilled, :rejected].include? @state + + set_state(true, value, nil) + event.set + true + end + + end + end +end \ No newline at end of file diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb index 804dc5453..1ec146f9b 100644 --- a/lib/concurrent/channel/unbuffered_channel.rb +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -9,30 +9,33 @@ def initialize end def push(value) - probe = @mutex.synchronize do - @condition.wait(@mutex) while @wait_set.empty? - @wait_set.shift + until first_waiting_probe.set_unless_assigned(value) end - - probe.set(value) end def pop - probe = IVar.new + probe = Probe.new + select(probe) + probe.value + end + def select(probe) @mutex.synchronize do @wait_set << probe @condition.signal end - - probe.value - end - - def select(probe) end def remove_probe(probe) end + private + def first_waiting_probe + @mutex.synchronize do + @condition.wait(@mutex) while @wait_set.empty? + @wait_set.shift + end + end + end end \ No newline at end of file diff --git a/spec/concurrent/channel/probe_spec.rb b/spec/concurrent/channel/probe_spec.rb new file mode 100644 index 000000000..a5616a8ce --- /dev/null +++ b/spec/concurrent/channel/probe_spec.rb @@ -0,0 +1,8 @@ +require 'spec_helper' + +module Concurrent + + describe Probe do + it 'should be written' + end +end diff --git a/spec/concurrent/channel/unbuffered_channel_spec.rb b/spec/concurrent/channel/unbuffered_channel_spec.rb index fe5ff0415..431a38cbe 100644 --- a/spec/concurrent/channel/unbuffered_channel_spec.rb +++ b/spec/concurrent/channel/unbuffered_channel_spec.rb @@ -31,6 +31,7 @@ module Concurrent end context 'cooperating threads' do + it 'passes the pushed value to thread waiting on pop' do result = nil @@ -41,7 +42,60 @@ module Concurrent result.should eq 42 end + + it 'passes the pushed value to only one thread' do + result = [] + + Thread.new { channel.push 37 } + Thread.new { result << channel.pop } + Thread.new { result << channel.pop } + + sleep(0.05) + + result.should have(1).items + end end + describe 'select' do + + let(:probe) { Probe.new } + + it 'does not block' do + t = Thread.new { channel.select(probe) } + + sleep(0.05) + + t.status.should eq false + end + + it 'gets notified by writer thread' do + channel.select(probe) + + Thread.new { channel.push 82 } + + probe.value.should eq 82 + end + + it 'ignores already set probes and waits for a new one' do + probe.set(27) + + channel.select(probe) + + t = Thread.new { channel.push 72 } + + sleep(0.05) + + t.status.should eq 'sleep' + + new_probe = Probe.new + + channel.select(new_probe) + + sleep(0.05) + + new_probe.value.should eq 72 + end + + end end end From abdcd1b4578ba0020579bd3b252fa59dbc39018b Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Wed, 2 Apr 2014 08:44:34 +0200 Subject: [PATCH 03/21] select operation on a channel list --- lib/concurrent.rb | 1 + lib/concurrent/channel/channel.rb | 10 ++++++++++ spec/concurrent/channel/channel_spec.rb | 24 ++++++++++++++++++++++++ 3 files changed, 35 insertions(+) create mode 100644 lib/concurrent/channel/channel.rb create mode 100644 spec/concurrent/channel/channel_spec.rb diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 0c4493305..8abdbe400 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -33,6 +33,7 @@ require 'concurrent/utilities' require 'concurrent/channel/probe' +require 'concurrent/channel/channel' require 'concurrent/channel/unbuffered_channel' require 'concurrent/cached_thread_pool' diff --git a/lib/concurrent/channel/channel.rb b/lib/concurrent/channel/channel.rb new file mode 100644 index 000000000..999fd964a --- /dev/null +++ b/lib/concurrent/channel/channel.rb @@ -0,0 +1,10 @@ +module Concurrent + class Channel + def self.select(*channels) + probe = Probe.new + + channels.each { |channel| channel.select(probe) } + probe.value + end + end +end \ No newline at end of file diff --git a/spec/concurrent/channel/channel_spec.rb b/spec/concurrent/channel/channel_spec.rb new file mode 100644 index 000000000..187eda715 --- /dev/null +++ b/spec/concurrent/channel/channel_spec.rb @@ -0,0 +1,24 @@ +require 'spec_helper' + +module Concurrent + + describe Channel do + + describe '.select' do + + context 'without timeout' do + it 'returns the first value available on a channel' do + channels = [ UnbufferedChannel.new, UnbufferedChannel.new] + + Thread.new { channels[1].push 77 } + + value = Channel.select(*channels) + + value.should eq 77 + end + end + + end + + end +end From 46531d7bd6a9704f8488fa4e57789f17e5d0e662 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Wed, 2 Apr 2014 08:53:07 +0200 Subject: [PATCH 04/21] Channel.select removes old probes --- lib/concurrent/channel/channel.rb | 5 +++-- spec/concurrent/channel/channel_spec.rb | 13 +++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/lib/concurrent/channel/channel.rb b/lib/concurrent/channel/channel.rb index 999fd964a..4f0bf2629 100644 --- a/lib/concurrent/channel/channel.rb +++ b/lib/concurrent/channel/channel.rb @@ -2,9 +2,10 @@ module Concurrent class Channel def self.select(*channels) probe = Probe.new - channels.each { |channel| channel.select(probe) } - probe.value + result = probe.value + channels.each { |channel| channel.remove_probe(probe) } + result end end end \ No newline at end of file diff --git a/spec/concurrent/channel/channel_spec.rb b/spec/concurrent/channel/channel_spec.rb index 187eda715..51cfe3553 100644 --- a/spec/concurrent/channel/channel_spec.rb +++ b/spec/concurrent/channel/channel_spec.rb @@ -16,6 +16,19 @@ module Concurrent value.should eq 77 end + + it 'cleans up' do + channels = [ UnbufferedChannel.new, UnbufferedChannel.new] + channels.each { |ch| ch.stub(:remove_probe).with( an_instance_of(Probe) )} + + Thread.new { channels[1].push 77 } + + value = Channel.select(*channels) + + value.should eq 77 + + channels.each { |ch| expect(ch).to have_received(:remove_probe).with( an_instance_of(Probe) ) } + end end end From 3adcaa046eebecdd7e45bd23706550e6a8f7c4b4 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Wed, 2 Apr 2014 20:57:01 +0200 Subject: [PATCH 05/21] implemented remove_probe --- lib/concurrent/channel/unbuffered_channel.rb | 13 +++++++--- .../channel/unbuffered_channel_spec.rb | 26 ++++++++++++++++--- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb index 1ec146f9b..50da2a928 100644 --- a/lib/concurrent/channel/unbuffered_channel.rb +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -5,7 +5,11 @@ def initialize @mutex = Mutex.new @condition = Condition.new - @wait_set = [] + @probe_set = [] + end + + def probe_set_size + @mutex.synchronize { @probe_set.size } end def push(value) @@ -21,19 +25,20 @@ def pop def select(probe) @mutex.synchronize do - @wait_set << probe + @probe_set << probe @condition.signal end end def remove_probe(probe) + @mutex.synchronize { @probe_set.delete(probe) } end private def first_waiting_probe @mutex.synchronize do - @condition.wait(@mutex) while @wait_set.empty? - @wait_set.shift + @condition.wait(@mutex) while @probe_set.empty? + @probe_set.shift end end diff --git a/spec/concurrent/channel/unbuffered_channel_spec.rb b/spec/concurrent/channel/unbuffered_channel_spec.rb index 431a38cbe..5704af5c8 100644 --- a/spec/concurrent/channel/unbuffered_channel_spec.rb +++ b/spec/concurrent/channel/unbuffered_channel_spec.rb @@ -4,7 +4,8 @@ module Concurrent describe UnbufferedChannel do - let!(:channel) { subject } # let is not thread safe, let! creates the object before ensuring uniqueness + let(:channel) { subject } + let(:probe) { Probe.new } context 'with one thread' do @@ -58,8 +59,6 @@ module Concurrent describe 'select' do - let(:probe) { Probe.new } - it 'does not block' do t = Thread.new { channel.select(probe) } @@ -97,5 +96,26 @@ module Concurrent end end + + describe 'probe set' do + + it 'has size zero after creation' do + channel.probe_set_size.should eq 0 + end + + it 'increases size after a select' do + channel.select(probe) + channel.probe_set_size.should eq 1 + end + + it 'decreases size after a removal' do + channel.select(probe) + channel.remove_probe(probe) + channel.probe_set_size.should eq 0 + end + + end + + end end From e944e275314de8197765318b40c2a2c4acbf5f94 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Wed, 2 Apr 2014 22:15:56 +0200 Subject: [PATCH 06/21] added buffered channel --- lib/concurrent.rb | 1 + lib/concurrent/channel/buffered_channel.rb | 90 +++++++++++ .../channel/buffered_channel_spec.rb | 151 ++++++++++++++++++ 3 files changed, 242 insertions(+) create mode 100644 lib/concurrent/channel/buffered_channel.rb create mode 100644 spec/concurrent/channel/buffered_channel_spec.rb diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 8abdbe400..92335e65f 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -35,6 +35,7 @@ require 'concurrent/channel/probe' require 'concurrent/channel/channel' require 'concurrent/channel/unbuffered_channel' +require 'concurrent/channel/buffered_channel' require 'concurrent/cached_thread_pool' require 'concurrent/fixed_thread_pool' diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb new file mode 100644 index 000000000..bbf6d9f10 --- /dev/null +++ b/lib/concurrent/channel/buffered_channel.rb @@ -0,0 +1,90 @@ +module Concurrent + class BufferedChannel + + def initialize(size) + @mutex = Mutex.new + @condition = Condition.new + @buffer_condition = Condition.new + + @probe_set = [] + @buffer = [] + @size = size + end + + def probe_set_size + @mutex.synchronize { @probe_set.size } + end + + def buffer_queue_size + @mutex.synchronize { @buffer.size } + end + + def push(value) + until set_probe_or_push_into_buffer(value) + end + end + + def pop + probe = Probe.new + select(probe) + probe.value + end + + def select(probe) + @mutex.synchronize do + + if @buffer.empty? + @probe_set << probe + true + else + shift_buffer if probe.set_unless_assigned peek_buffer + end + + end + end + + def remove_probe(probe) + @mutex.synchronize { @probe_set.delete(probe) } + end + + private + + def buffer_full? + @buffer.size == @size + end + + def buffer_empty? + @buffer.empty? + end + + def push_into_buffer(value) + @buffer_condition.wait(@mutex) while buffer_full? + @buffer << value + @buffer_condition.broadcast + end + + def peek_buffer + @buffer_condition.wait(@mutex) while buffer_empty? + @buffer.first + end + + def shift_buffer + @buffer_condition.wait(@mutex) while buffer_empty? + result = @buffer.shift + @buffer_condition.broadcast + result + end + + def set_probe_or_push_into_buffer(value) + @mutex.synchronize do + if @probe_set.empty? + push_into_buffer(value) + true + else + @probe_set.shift.set_unless_assigned(value) + end + end + end + + end +end \ No newline at end of file diff --git a/spec/concurrent/channel/buffered_channel_spec.rb b/spec/concurrent/channel/buffered_channel_spec.rb new file mode 100644 index 000000000..3964031df --- /dev/null +++ b/spec/concurrent/channel/buffered_channel_spec.rb @@ -0,0 +1,151 @@ +require 'spec_helper' + +module Concurrent + + describe BufferedChannel do + + let(:size) { 2 } + let(:channel) { BufferedChannel.new(size) } + let(:probe) { Probe.new } + + context 'without timeout' do + + describe '#push' do + it 'adds elements to buffer' do + channel.buffer_queue_size.should be 0 + + channel.push('a') + channel.push('a') + + channel.buffer_queue_size.should be 2 + end + + it 'should block when buffer is full' do + channel.push 1 + channel.push 2 + + t = Thread.new { channel.push 3 } + sleep(0.05) + t.status.should eq 'sleep' + end + + it 'restarts thread when buffer is no more full' do + channel.push 'hi' + channel.push 'foo' + + result = nil + + Thread.new { channel.push 'bar'; result = 42 } + + sleep(0.1) + + channel.pop + + sleep(0.1) + + result.should eq 42 + end + + it 'should assign value to a probe if probe set is not empty' do + channel.select(probe) + Thread.new { sleep(0.1); channel.push 3 } + probe.value.should eq 3 + end + end + + describe '#pop' do + it 'should block if buffer is empty' do + t = Thread.new { channel.pop } + sleep(0.05) + t.status.should eq 'sleep' + end + + it 'returns value if buffer is not empty' do + channel.push 1 + result = channel.pop + + result.should eq 1 + end + + it 'removes the first value from the buffer' do + channel.push 'a' + channel.push 'b' + + channel.pop.should eq 'a' + channel.buffer_queue_size.should eq 1 + end + end + + end + + describe 'select' do + + it 'does not block' do + t = Thread.new { channel.select(probe) } + + sleep(0.05) + + t.status.should eq false + end + + it 'gets notified by writer thread' do + channel.select(probe) + + Thread.new { channel.push 82 } + + probe.value.should eq 82 + end + + end + + context 'already set probes' do + context 'empty buffer' do + it 'discards already set probes' do + probe.set('set value') + + channel.select(probe) + + channel.push 27 + + channel.buffer_queue_size.should eq 1 + channel.probe_set_size.should eq 0 + end + end + + context 'empty probe set' do + it 'discards set probe' do + probe.set('set value') + + channel.push 82 + + channel.select(probe) + + channel.buffer_queue_size.should eq 1 + + channel.pop.should eq 82 + + end + end + end + + describe 'probe set' do + + it 'has size zero after creation' do + channel.probe_set_size.should eq 0 + end + + it 'increases size after a select' do + channel.select(probe) + channel.probe_set_size.should eq 1 + end + + it 'decreases size after a removal' do + channel.select(probe) + channel.remove_probe(probe) + channel.probe_set_size.should eq 0 + end + + end + + end +end From a3e62b7981033b8e8e6de2664a5cadd616e54761 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Wed, 2 Apr 2014 22:18:36 +0200 Subject: [PATCH 07/21] fixed unbuffered channel style --- lib/concurrent/channel/unbuffered_channel.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb index 50da2a928..d42095d18 100644 --- a/lib/concurrent/channel/unbuffered_channel.rb +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -35,12 +35,12 @@ def remove_probe(probe) end private - def first_waiting_probe - @mutex.synchronize do - @condition.wait(@mutex) while @probe_set.empty? - @probe_set.shift - end + def first_waiting_probe + @mutex.synchronize do + @condition.wait(@mutex) while @probe_set.empty? + @probe_set.shift end + end end end \ No newline at end of file From 5b71e7a2f39a3fe374700c8f946b04f1a0121f08 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Thu, 3 Apr 2014 08:22:23 +0200 Subject: [PATCH 08/21] fixed test race condition --- .../channel/buffered_channel_spec.rb | 2 +- .../channel/unbuffered_channel_spec.rb | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/spec/concurrent/channel/buffered_channel_spec.rb b/spec/concurrent/channel/buffered_channel_spec.rb index 3964031df..5db769962 100644 --- a/spec/concurrent/channel/buffered_channel_spec.rb +++ b/spec/concurrent/channel/buffered_channel_spec.rb @@ -5,7 +5,7 @@ module Concurrent describe BufferedChannel do let(:size) { 2 } - let(:channel) { BufferedChannel.new(size) } + let!(:channel) { BufferedChannel.new(size) } let(:probe) { Probe.new } context 'without timeout' do diff --git a/spec/concurrent/channel/unbuffered_channel_spec.rb b/spec/concurrent/channel/unbuffered_channel_spec.rb index 5704af5c8..a63258c2c 100644 --- a/spec/concurrent/channel/unbuffered_channel_spec.rb +++ b/spec/concurrent/channel/unbuffered_channel_spec.rb @@ -4,7 +4,7 @@ module Concurrent describe UnbufferedChannel do - let(:channel) { subject } + let!(:channel) { subject } let(:probe) { Probe.new } context 'with one thread' do @@ -37,9 +37,9 @@ module Concurrent result = nil Thread.new { channel.push 42 } - Thread.new { result = channel.pop } + Thread.new { result = channel.pop; } - sleep(0.05) + sleep(0.1) result.should eq 42 end @@ -51,10 +51,21 @@ module Concurrent Thread.new { result << channel.pop } Thread.new { result << channel.pop } - sleep(0.05) + sleep(0.1) result.should have(1).items end + + it 'gets the pushed value when ready' do + result = nil + + Thread.new { result = channel.pop; } + Thread.new { channel.push 57 } + + sleep(0.1) + + result.should eq 57 + end end describe 'select' do From d8e6f5e4dd0f6a13381b3e1f86618bb75b76a1e5 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Thu, 3 Apr 2014 08:45:50 +0200 Subject: [PATCH 09/21] added probe specs --- spec/concurrent/channel/probe_spec.rb | 43 ++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/spec/concurrent/channel/probe_spec.rb b/spec/concurrent/channel/probe_spec.rb index a5616a8ce..ab0a83501 100644 --- a/spec/concurrent/channel/probe_spec.rb +++ b/spec/concurrent/channel/probe_spec.rb @@ -3,6 +3,47 @@ module Concurrent describe Probe do - it 'should be written' + + let(:probe) { Probe.new } + + describe '#set_unless_assigned' do + context 'empty probe' do + it 'assigns the value' do + probe.set_unless_assigned(32) + probe.value.should eq 32 + end + + it 'returns true' do + probe.set_unless_assigned('hi').should eq true + end + end + + context 'fulfilled probe' do + before(:each) { probe.set(27) } + + it 'does not assign the value' do + probe.set_unless_assigned(88) + probe.value.should eq 27 + end + + it 'returns false' do + probe.set_unless_assigned('hello').should eq false + end + end + + context 'rejected probe' do + before(:each) { probe.fail } + + it 'does not assign the value' do + probe.set_unless_assigned(88) + probe.should be_rejected + end + + it 'returns false' do + probe.set_unless_assigned('hello').should eq false + end + end + end + end end From a599eb833ae1148a2b6b442b4374022fcc105da2 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Thu, 3 Apr 2014 08:59:13 +0200 Subject: [PATCH 10/21] added waitable list --- lib/concurrent/channel/buffered_channel.rb | 12 ++++--- lib/concurrent/channel/unbuffered_channel.rb | 26 ++++---------- lib/concurrent/channel/waitable_list.rb | 38 ++++++++++++++++++++ 3 files changed, 52 insertions(+), 24 deletions(-) create mode 100644 lib/concurrent/channel/waitable_list.rb diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index bbf6d9f10..150d0137a 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -1,3 +1,5 @@ +require_relative 'waitable_list' + module Concurrent class BufferedChannel @@ -6,13 +8,13 @@ def initialize(size) @condition = Condition.new @buffer_condition = Condition.new - @probe_set = [] + @probe_set = WaitableList.new @buffer = [] @size = size end def probe_set_size - @mutex.synchronize { @probe_set.size } + @probe_set.size end def buffer_queue_size @@ -34,7 +36,7 @@ def select(probe) @mutex.synchronize do if @buffer.empty? - @probe_set << probe + @probe_set.push(probe) true else shift_buffer if probe.set_unless_assigned peek_buffer @@ -44,7 +46,7 @@ def select(probe) end def remove_probe(probe) - @mutex.synchronize { @probe_set.delete(probe) } + @probe_set.delete(probe) end private @@ -81,7 +83,7 @@ def set_probe_or_push_into_buffer(value) push_into_buffer(value) true else - @probe_set.shift.set_unless_assigned(value) + @probe_set.first.set_unless_assigned(value) end end end diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb index d42095d18..788bb0ff4 100644 --- a/lib/concurrent/channel/unbuffered_channel.rb +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -1,19 +1,18 @@ +require_relative 'waitable_list' + module Concurrent class UnbufferedChannel def initialize - @mutex = Mutex.new - @condition = Condition.new - - @probe_set = [] + @probe_set = WaitableList.new end def probe_set_size - @mutex.synchronize { @probe_set.size } + @probe_set.size end def push(value) - until first_waiting_probe.set_unless_assigned(value) + until @probe_set.first.set_unless_assigned(value) end end @@ -24,22 +23,11 @@ def pop end def select(probe) - @mutex.synchronize do - @probe_set << probe - @condition.signal - end + @probe_set.push(probe) end def remove_probe(probe) - @mutex.synchronize { @probe_set.delete(probe) } - end - - private - def first_waiting_probe - @mutex.synchronize do - @condition.wait(@mutex) while @probe_set.empty? - @probe_set.shift - end + @probe_set.delete(probe) end end diff --git a/lib/concurrent/channel/waitable_list.rb b/lib/concurrent/channel/waitable_list.rb new file mode 100644 index 000000000..ea87b6320 --- /dev/null +++ b/lib/concurrent/channel/waitable_list.rb @@ -0,0 +1,38 @@ +module Concurrent + class WaitableList + + def initialize + @mutex = Mutex.new + @condition = Condition.new + + @list = [] + end + + def size + @mutex.synchronize { @list.size } + end + + def empty? + @mutex.synchronize { @list.empty? } + end + + def push(value) + @mutex.synchronize do + @list << value + @condition.signal + end + end + + def delete(value) + @mutex.synchronize { @list.delete(value) } + end + + def first + @mutex.synchronize do + @condition.wait(@mutex) while @list.empty? + @list.shift + end + end + + end +end \ No newline at end of file From 1c06a3c4c97ea3777ae1ce35beac1abf3943b440 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Thu, 3 Apr 2014 21:26:17 +0200 Subject: [PATCH 11/21] timer task now uses thread safe observer set instead of Observable module --- lib/concurrent/timer_task.rb | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/lib/concurrent/timer_task.rb b/lib/concurrent/timer_task.rb index d4340d031..c981a0790 100644 --- a/lib/concurrent/timer_task.rb +++ b/lib/concurrent/timer_task.rb @@ -9,9 +9,9 @@ module Concurrent # A very common currency pattern is to run a thread that performs a task at regular - # intervals. The thread that peforms the task sleeps for the given interval then + # intervals. The thread that performs the task sleeps for the given interval then # wakes up and performs the task. Lather, rinse, repeat... This pattern causes two - # problems. First, it is difficult to test the business logic of the task becuse the + # problems. First, it is difficult to test the business logic of the task because the # task itself is tightly coupled with the concurrency logic. Second, an exception in # raised while performing the task can cause the entire thread to abend. In a # long-running application where the task thread is intended to run for days/weeks/years @@ -25,7 +25,7 @@ module Concurrent # performing logging or ancillary operations. +TimerTask+ can also be configured with a # timeout value allowing it to kill a task that runs too long. # - # One other advantage of +TimerTask+ is it forces the bsiness logic to be completely decoupled + # One other advantage of +TimerTask+ is it forces the business logic to be completely decoupled # from the concurrency logic. The business logic can be tested separately then passed to the # +TimerTask+ for scheduling and running. # @@ -147,7 +147,6 @@ class TimerTask include Dereferenceable include Runnable include Stoppable - include Observable # Default +:execution_interval+ EXECUTION_INTERVAL = 60 @@ -171,7 +170,7 @@ class TimerTask # @option opts [Integer] :timeout_interval number of seconds a task can # run before it is considered to have failed (default: TIMEOUT_INTERVAL) # @option opts [Boolean] :run_now Whether to run the task immediately - # upon instanciation or to wait until the first #execution_interval + # upon instantiation or to wait until the first #execution_interval # has passed (default: false) # # @raise ArgumentError when no block is given. @@ -193,9 +192,10 @@ def initialize(opts = {}, &block) self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL - @run_now = opts[:now] || opts[:run_now] || false + @run_now = opts[:now] || opts[:run_now] @task = block + @observers = CopyOnWriteObserverSet.new init_mutex set_deref_options(opts) end @@ -226,6 +226,10 @@ def timeout_interval=(value) @timeout_interval = value end + def add_observer(observer, func = :update) + @observers.add_observer(observer, func) + end + # Terminate with extreme prejudice. Useful in cases where +#stop+ doesn't # work because one of the threads becomes unresponsive. # @@ -278,11 +282,10 @@ def execute_task # :nodoc: end raise TimeoutError if @worker.join(@timeout_interval).nil? mutex.synchronize { @value = @worker[:result] } - rescue Exception => ex - # suppress + rescue Exception => e + ex = e ensure - changed - notify_observers(Time.now, self.value, ex) + @observers.notify_observers(Time.now, self.value, ex) unless @worker.nil? Thread.kill(@worker) @worker = nil From b9a2a3c72545217d1362f50688070d624d091f7c Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Sat, 5 Apr 2014 20:40:08 +0200 Subject: [PATCH 12/21] first RingBuffer implementation --- lib/concurrent.rb | 1 + lib/concurrent/channel/ring_buffer.rb | 53 ++++++++++++ spec/concurrent/channel/ring_buffer_spec.rb | 89 +++++++++++++++++++++ 3 files changed, 143 insertions(+) create mode 100644 lib/concurrent/channel/ring_buffer.rb create mode 100644 spec/concurrent/channel/ring_buffer_spec.rb diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 92335e65f..238c435ef 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -36,6 +36,7 @@ require 'concurrent/channel/channel' require 'concurrent/channel/unbuffered_channel' require 'concurrent/channel/buffered_channel' +require 'concurrent/channel/ring_buffer' require 'concurrent/cached_thread_pool' require 'concurrent/fixed_thread_pool' diff --git a/lib/concurrent/channel/ring_buffer.rb b/lib/concurrent/channel/ring_buffer.rb new file mode 100644 index 000000000..74e514b3a --- /dev/null +++ b/lib/concurrent/channel/ring_buffer.rb @@ -0,0 +1,53 @@ +module Concurrent + class RingBuffer + + def initialize(capacity) + @buffer = Array.new(capacity) + @first = @last = 0 + @count = 0 + @mutex = Mutex.new + @condition = Condition.new + end + + def capacity + @mutex.synchronize { @buffer.size } + end + + def count + @mutex.synchronize { @count } + end + + def put(value) + @mutex.synchronize do + wait_while_full + @buffer[@last] = value + @last += 1 + @count += 1 + @condition.signal + end + end + + def take + @mutex.synchronize do + wait_while_empty + result = @buffer[@first] + @buffer[@first] = nil + @first += 1 + @count -= 1 + @condition.signal + result + end + end + + private + + def wait_while_full + @condition.wait(@mutex) while @count == @buffer.size + end + + def wait_while_empty + @condition.wait(@mutex) while @count == 0 + end + + end +end \ No newline at end of file diff --git a/spec/concurrent/channel/ring_buffer_spec.rb b/spec/concurrent/channel/ring_buffer_spec.rb new file mode 100644 index 000000000..e939cb8ad --- /dev/null +++ b/spec/concurrent/channel/ring_buffer_spec.rb @@ -0,0 +1,89 @@ +require 'spec_helper' + +module Concurrent + + describe RingBuffer do + + let(:capacity) { 3 } + let(:buffer) { RingBuffer.new( capacity ) } + + describe '#capacity' do + it 'returns the value passed in constructor' do + buffer.capacity.should eq capacity + end + end + + describe '#count' do + it 'is zero when created' do + buffer.count.should eq 0 + end + + it 'increases when an element is added' do + buffer.put 5 + buffer.count.should eq 1 + + buffer.put 1 + buffer.count.should eq 2 + end + + it 'decreases when an element is removed' do + buffer.put 10 + + buffer.take + + buffer.count.should eq 0 + end + end + + describe '#put' do + it 'block when buffer is full' do + capacity.times { buffer.put 27 } + + t = Thread.new { buffer.put 32 } + + sleep(0.1) + + t.status.should eq 'sleep' + end + + it 'continues when an element is removed' do + latch = CountDownLatch.new(1) + + Thread.new { (capacity + 1).times { buffer.put 'hi' }; latch.count_down } + Thread.new { sleep(0.1); buffer.take } + + latch.wait(0.2).should be_true + end + end + + describe '#take' do + it 'blocks when buffer is empty' do + t = Thread.new { buffer.take } + + sleep(0.1) + + t.status.should eq 'sleep' + end + + it 'continues when an element is added' do + latch = CountDownLatch.new(1) + + Thread.new { buffer.take; latch.count_down } + Thread.new { sleep(0.1); buffer.put 3 } + + latch.wait(0.2).should be_true + end + + it 'returns the first added value' do + buffer.put 'hi' + buffer.put 'foo' + buffer.put 'bar' + + buffer.take.should eq 'hi' + buffer.take.should eq 'foo' + buffer.take.should eq 'bar' + end + end + + end +end From af2c37d3614210eee10aa220f8ac8c914d729376 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Sat, 5 Apr 2014 20:51:29 +0200 Subject: [PATCH 13/21] added circular condition --- lib/concurrent/channel/ring_buffer.rb | 4 ++-- spec/concurrent/channel/ring_buffer_spec.rb | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/concurrent/channel/ring_buffer.rb b/lib/concurrent/channel/ring_buffer.rb index 74e514b3a..60c07b55d 100644 --- a/lib/concurrent/channel/ring_buffer.rb +++ b/lib/concurrent/channel/ring_buffer.rb @@ -21,7 +21,7 @@ def put(value) @mutex.synchronize do wait_while_full @buffer[@last] = value - @last += 1 + @last = (@last + 1) % @buffer.size @count += 1 @condition.signal end @@ -32,7 +32,7 @@ def take wait_while_empty result = @buffer[@first] @buffer[@first] = nil - @first += 1 + @first = (@first + 1) % @buffer.size @count -= 1 @condition.signal result diff --git a/spec/concurrent/channel/ring_buffer_spec.rb b/spec/concurrent/channel/ring_buffer_spec.rb index e939cb8ad..cd9099b0c 100644 --- a/spec/concurrent/channel/ring_buffer_spec.rb +++ b/spec/concurrent/channel/ring_buffer_spec.rb @@ -85,5 +85,17 @@ module Concurrent end end + context 'circular condition' do + it 'can filled many times' do + capacity.times { buffer.put 3 } + capacity.times { buffer.take } + + buffer.put 'hi' + + buffer.take.should eq 'hi' + buffer.capacity.should eq capacity + end + end + end end From 8a05e220bdc4cde5859be27eb0e11324181c0784 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Sat, 5 Apr 2014 21:11:40 +0200 Subject: [PATCH 14/21] added RingBuffer#empty? and RingBuffer#full? --- lib/concurrent/channel/ring_buffer.rb | 8 ++++++ spec/concurrent/channel/ring_buffer_spec.rb | 30 +++++++++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/lib/concurrent/channel/ring_buffer.rb b/lib/concurrent/channel/ring_buffer.rb index 60c07b55d..95d8dea13 100644 --- a/lib/concurrent/channel/ring_buffer.rb +++ b/lib/concurrent/channel/ring_buffer.rb @@ -17,6 +17,14 @@ def count @mutex.synchronize { @count } end + def full? + @mutex.synchronize { @count == @buffer.size } + end + + def empty? + @mutex.synchronize { @count == 0 } + end + def put(value) @mutex.synchronize do wait_while_full diff --git a/spec/concurrent/channel/ring_buffer_spec.rb b/spec/concurrent/channel/ring_buffer_spec.rb index cd9099b0c..c270afabe 100644 --- a/spec/concurrent/channel/ring_buffer_spec.rb +++ b/spec/concurrent/channel/ring_buffer_spec.rb @@ -7,6 +7,10 @@ module Concurrent let(:capacity) { 3 } let(:buffer) { RingBuffer.new( capacity ) } + def fill_buffer + capacity.times { buffer.put 3 } + end + describe '#capacity' do it 'returns the value passed in constructor' do buffer.capacity.should eq capacity @@ -35,9 +39,31 @@ module Concurrent end end + describe '#empty?' do + it 'is true when count is zero' do + buffer.empty?.should be_true + end + + it 'is false when count is not zero' do + buffer.put 82 + buffer.empty?.should be_false + end + end + + describe '#full?' do + it 'is true when count is capacity' do + fill_buffer + buffer.full?.should be_true + end + + it 'is false when count is not capacity' do + buffer.full?.should be_false + end + end + describe '#put' do it 'block when buffer is full' do - capacity.times { buffer.put 27 } + fill_buffer t = Thread.new { buffer.put 32 } @@ -87,7 +113,7 @@ module Concurrent context 'circular condition' do it 'can filled many times' do - capacity.times { buffer.put 3 } + fill_buffer capacity.times { buffer.take } buffer.put 'hi' From 8d3b1edb06168b881178e327abdb84d2e526dfeb Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Sun, 6 Apr 2014 17:15:18 +0200 Subject: [PATCH 15/21] added RingBuffer#peek --- lib/concurrent/channel/ring_buffer.rb | 4 ++++ spec/concurrent/channel/ring_buffer_spec.rb | 24 ++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/lib/concurrent/channel/ring_buffer.rb b/lib/concurrent/channel/ring_buffer.rb index 95d8dea13..1e778c779 100644 --- a/lib/concurrent/channel/ring_buffer.rb +++ b/lib/concurrent/channel/ring_buffer.rb @@ -47,6 +47,10 @@ def take end end + def peek + @mutex.synchronize { @buffer[@first] } + end + private def wait_while_full diff --git a/spec/concurrent/channel/ring_buffer_spec.rb b/spec/concurrent/channel/ring_buffer_spec.rb index c270afabe..098b9a4c3 100644 --- a/spec/concurrent/channel/ring_buffer_spec.rb +++ b/spec/concurrent/channel/ring_buffer_spec.rb @@ -5,7 +5,7 @@ module Concurrent describe RingBuffer do let(:capacity) { 3 } - let(:buffer) { RingBuffer.new( capacity ) } + let(:buffer) { RingBuffer.new(capacity) } def fill_buffer capacity.times { buffer.put 3 } @@ -111,6 +111,28 @@ def fill_buffer end end + describe '#peek' do + context 'buffer empty' do + it 'returns nil when buffer is empty' do + buffer.peek.should be_nil + end + end + + context 'not empty' do + + before(:each) { buffer.put 'element' } + + it 'returns the first value' do + buffer.peek.should eq 'element' + end + + it 'does not change buffer' do + buffer.peek + buffer.count.should eq 1 + end + end + end + context 'circular condition' do it 'can filled many times' do fill_buffer From 22211b30e24174186de8481824768dc06295ba01 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Sun, 6 Apr 2014 17:30:11 +0200 Subject: [PATCH 16/21] renamed RingBuffer to BlockingRingBuffer --- lib/concurrent.rb | 2 +- .../channel/{ring_buffer.rb => blocking_ring_buffer.rb} | 2 +- spec/concurrent/channel/ring_buffer_spec.rb | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) rename lib/concurrent/channel/{ring_buffer.rb => blocking_ring_buffer.rb} (97%) diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 238c435ef..50d7dc14e 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -36,7 +36,7 @@ require 'concurrent/channel/channel' require 'concurrent/channel/unbuffered_channel' require 'concurrent/channel/buffered_channel' -require 'concurrent/channel/ring_buffer' +require 'concurrent/channel/blocking_ring_buffer' require 'concurrent/cached_thread_pool' require 'concurrent/fixed_thread_pool' diff --git a/lib/concurrent/channel/ring_buffer.rb b/lib/concurrent/channel/blocking_ring_buffer.rb similarity index 97% rename from lib/concurrent/channel/ring_buffer.rb rename to lib/concurrent/channel/blocking_ring_buffer.rb index 1e778c779..5843cdabb 100644 --- a/lib/concurrent/channel/ring_buffer.rb +++ b/lib/concurrent/channel/blocking_ring_buffer.rb @@ -1,5 +1,5 @@ module Concurrent - class RingBuffer + class BlockingRingBuffer def initialize(capacity) @buffer = Array.new(capacity) diff --git a/spec/concurrent/channel/ring_buffer_spec.rb b/spec/concurrent/channel/ring_buffer_spec.rb index 098b9a4c3..cf4bedcdb 100644 --- a/spec/concurrent/channel/ring_buffer_spec.rb +++ b/spec/concurrent/channel/ring_buffer_spec.rb @@ -2,10 +2,10 @@ module Concurrent - describe RingBuffer do + describe BlockingRingBuffer do let(:capacity) { 3 } - let(:buffer) { RingBuffer.new(capacity) } + let(:buffer) { BlockingRingBuffer.new(capacity) } def fill_buffer capacity.times { buffer.put 3 } From 0b0d3175f5b25986819aaec3c62d83ec9a0f7989 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Sun, 6 Apr 2014 19:11:04 +0200 Subject: [PATCH 17/21] renamed blocking ring buffer spec --- .../channel/{ring_buffer_spec.rb => blocking_ring_buffer_spec.rb} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename spec/concurrent/channel/{ring_buffer_spec.rb => blocking_ring_buffer_spec.rb} (100%) diff --git a/spec/concurrent/channel/ring_buffer_spec.rb b/spec/concurrent/channel/blocking_ring_buffer_spec.rb similarity index 100% rename from spec/concurrent/channel/ring_buffer_spec.rb rename to spec/concurrent/channel/blocking_ring_buffer_spec.rb From b2b7d21b4265c9d4eabac4f0fcbc57353408d0f6 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Sun, 6 Apr 2014 20:56:18 +0200 Subject: [PATCH 18/21] changed waiting list interface --- lib/concurrent/channel/buffered_channel.rb | 4 ++-- lib/concurrent/channel/unbuffered_channel.rb | 4 ++-- lib/concurrent/channel/waitable_list.rb | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index 150d0137a..1c12ac809 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -36,7 +36,7 @@ def select(probe) @mutex.synchronize do if @buffer.empty? - @probe_set.push(probe) + @probe_set.put(probe) true else shift_buffer if probe.set_unless_assigned peek_buffer @@ -83,7 +83,7 @@ def set_probe_or_push_into_buffer(value) push_into_buffer(value) true else - @probe_set.first.set_unless_assigned(value) + @probe_set.take.set_unless_assigned(value) end end end diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb index 788bb0ff4..453398f96 100644 --- a/lib/concurrent/channel/unbuffered_channel.rb +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -12,7 +12,7 @@ def probe_set_size end def push(value) - until @probe_set.first.set_unless_assigned(value) + until @probe_set.take.set_unless_assigned(value) end end @@ -23,7 +23,7 @@ def pop end def select(probe) - @probe_set.push(probe) + @probe_set.put(probe) end def remove_probe(probe) diff --git a/lib/concurrent/channel/waitable_list.rb b/lib/concurrent/channel/waitable_list.rb index ea87b6320..d96521a73 100644 --- a/lib/concurrent/channel/waitable_list.rb +++ b/lib/concurrent/channel/waitable_list.rb @@ -16,7 +16,7 @@ def empty? @mutex.synchronize { @list.empty? } end - def push(value) + def put(value) @mutex.synchronize do @list << value @condition.signal @@ -27,7 +27,7 @@ def delete(value) @mutex.synchronize { @list.delete(value) } end - def first + def take @mutex.synchronize do @condition.wait(@mutex) while @list.empty? @list.shift From bdab705a2d4087fa5affb8c35d53de1063bd9284 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Mon, 7 Apr 2014 08:50:56 +0200 Subject: [PATCH 19/21] added non safe ring buffer --- lib/concurrent.rb | 1 + lib/concurrent/channel/ring_buffer.rb | 54 +++++++++ spec/concurrent/channel/ring_buffer_spec.rb | 126 ++++++++++++++++++++ 3 files changed, 181 insertions(+) create mode 100644 lib/concurrent/channel/ring_buffer.rb create mode 100644 spec/concurrent/channel/ring_buffer_spec.rb diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 50d7dc14e..a62678d5b 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -36,6 +36,7 @@ require 'concurrent/channel/channel' require 'concurrent/channel/unbuffered_channel' require 'concurrent/channel/buffered_channel' +require 'concurrent/channel/ring_buffer' require 'concurrent/channel/blocking_ring_buffer' require 'concurrent/cached_thread_pool' diff --git a/lib/concurrent/channel/ring_buffer.rb b/lib/concurrent/channel/ring_buffer.rb new file mode 100644 index 000000000..761fd78a6 --- /dev/null +++ b/lib/concurrent/channel/ring_buffer.rb @@ -0,0 +1,54 @@ +module Concurrent + + # not thread safe buffer + class RingBuffer + + def initialize(capacity) + @buffer = Array.new(capacity) + @first = @last = 0 + @count = 0 + end + + def capacity + @buffer.size + end + + def count + @count + end + + def empty? + @count == 0 + end + + def full? + @count == capacity + end + + # @param [Object] value + # @return [Boolean] true if value has been inserted, false otherwise + def offer(value) + return false if full? + + @buffer[@last] = value + @last = (@last + 1) % @buffer.size + @count += 1 + true + end + + # @return [Object] the first available value and removes it from the buffer. If buffer is empty returns nil + def poll + result = @buffer[@first] + @buffer[@first] = nil + @first = (@first + 1) % @buffer.size + @count -= 1 + result + end + + # @return [Object] the first available value and without removing it from the buffer. If buffer is empty returns nil + def peek + @buffer[@first] + end + + end +end \ No newline at end of file diff --git a/spec/concurrent/channel/ring_buffer_spec.rb b/spec/concurrent/channel/ring_buffer_spec.rb new file mode 100644 index 000000000..77a007b5e --- /dev/null +++ b/spec/concurrent/channel/ring_buffer_spec.rb @@ -0,0 +1,126 @@ +require 'spec_helper' + +module Concurrent + + describe RingBuffer do + + let(:capacity) { 3 } + let(:buffer) { RingBuffer.new(capacity) } + + def fill_buffer + capacity.times { buffer.offer 3 } + end + + describe '#capacity' do + it 'returns the value passed in constructor' do + buffer.capacity.should eq capacity + end + end + + describe '#count' do + it 'is zero when created' do + buffer.count.should eq 0 + end + + it 'increases when an element is added' do + buffer.offer 5 + buffer.count.should eq 1 + + buffer.offer 1 + buffer.count.should eq 2 + end + + it 'decreases when an element is removed' do + buffer.offer 10 + buffer.poll + + buffer.count.should eq 0 + end + end + + describe '#empty?' do + it 'is true when count is zero' do + buffer.empty?.should be_true + end + + it 'is false when count is not zero' do + buffer.offer 82 + buffer.empty?.should be_false + end + end + + describe '#full?' do + it 'is true when count is capacity' do + fill_buffer + buffer.full?.should be_true + end + + it 'is false when count is not capacity' do + buffer.full?.should be_false + end + end + + describe '#offer' do + it 'returns false when buffer is full' do + fill_buffer + buffer.offer(3).should be_false + end + + it 'returns true when the buffer is not full' do + buffer.offer(5).should be_true + end + + end + + describe '#poll' do + it 'returns the first added value' do + buffer.offer 'hi' + buffer.offer 'foo' + buffer.offer 'bar' + + buffer.poll.should eq 'hi' + buffer.poll.should eq 'foo' + buffer.poll.should eq 'bar' + end + + it 'returns nil when buffer is empty' do + buffer.poll.should be_nil + end + end + + describe '#peek' do + context 'buffer empty' do + it 'returns nil when buffer is empty' do + buffer.peek.should be_nil + end + end + + context 'not empty' do + + before(:each) { buffer.offer 'element' } + + it 'returns the first value' do + buffer.peek.should eq 'element' + end + + it 'does not change buffer' do + buffer.peek + buffer.count.should eq 1 + end + end + end + + context 'circular condition' do + it 'can filled many times' do + fill_buffer + capacity.times { buffer.poll } + + buffer.offer 'hi' + + buffer.poll.should eq 'hi' + buffer.capacity.should eq capacity + end + end + + end +end From 774c2134a2c7a9c475998c507fc1dbc22c69421b Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Mon, 7 Apr 2014 08:55:46 +0200 Subject: [PATCH 20/21] buffered channel refactor using ring buffer --- lib/concurrent/channel/buffered_channel.rb | 25 +++++++--------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index 1c12ac809..710154d4d 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -9,8 +9,7 @@ def initialize(size) @buffer_condition = Condition.new @probe_set = WaitableList.new - @buffer = [] - @size = size + @buffer = RingBuffer.new(size) end def probe_set_size @@ -18,7 +17,7 @@ def probe_set_size end def buffer_queue_size - @mutex.synchronize { @buffer.size } + @mutex.synchronize { @buffer.count } end def push(value) @@ -51,28 +50,20 @@ def remove_probe(probe) private - def buffer_full? - @buffer.size == @size - end - - def buffer_empty? - @buffer.empty? - end - def push_into_buffer(value) - @buffer_condition.wait(@mutex) while buffer_full? - @buffer << value + @buffer_condition.wait(@mutex) while @buffer.full? + @buffer.offer value @buffer_condition.broadcast end def peek_buffer - @buffer_condition.wait(@mutex) while buffer_empty? - @buffer.first + @buffer_condition.wait(@mutex) while @buffer.empty? + @buffer.peek end def shift_buffer - @buffer_condition.wait(@mutex) while buffer_empty? - result = @buffer.shift + @buffer_condition.wait(@mutex) while @buffer.empty? + result = @buffer.poll @buffer_condition.broadcast result end From b8b0ab146e2d7d9ee8320fc410b2ce26c832abe3 Mon Sep 17 00:00:00 2001 From: Michele Della Torre Date: Mon, 7 Apr 2014 08:58:00 +0200 Subject: [PATCH 21/21] blocking ring buffer now uses a ring buffer --- .../channel/blocking_ring_buffer.rb | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/lib/concurrent/channel/blocking_ring_buffer.rb b/lib/concurrent/channel/blocking_ring_buffer.rb index 5843cdabb..856a9a3a1 100644 --- a/lib/concurrent/channel/blocking_ring_buffer.rb +++ b/lib/concurrent/channel/blocking_ring_buffer.rb @@ -2,7 +2,7 @@ module Concurrent class BlockingRingBuffer def initialize(capacity) - @buffer = Array.new(capacity) + @buffer = RingBuffer.new(capacity) @first = @last = 0 @count = 0 @mutex = Mutex.new @@ -10,27 +10,25 @@ def initialize(capacity) end def capacity - @mutex.synchronize { @buffer.size } + @mutex.synchronize { @buffer.capacity } end def count - @mutex.synchronize { @count } + @mutex.synchronize { @buffer.count } end def full? - @mutex.synchronize { @count == @buffer.size } + @mutex.synchronize { @buffer.full? } end def empty? - @mutex.synchronize { @count == 0 } + @mutex.synchronize { @buffer.empty? } end def put(value) @mutex.synchronize do wait_while_full - @buffer[@last] = value - @last = (@last + 1) % @buffer.size - @count += 1 + @buffer.offer(value) @condition.signal end end @@ -38,27 +36,24 @@ def put(value) def take @mutex.synchronize do wait_while_empty - result = @buffer[@first] - @buffer[@first] = nil - @first = (@first + 1) % @buffer.size - @count -= 1 + result = @buffer.poll @condition.signal result end end def peek - @mutex.synchronize { @buffer[@first] } + @mutex.synchronize { @buffer.peek } end private def wait_while_full - @condition.wait(@mutex) while @count == @buffer.size + @condition.wait(@mutex) while @buffer.full? end def wait_while_empty - @condition.wait(@mutex) while @count == 0 + @condition.wait(@mutex) while @buffer.empty? end end