Skip to content

Commit 0f33d09

Browse files
committed
Add Promise::Channel
1 parent a756a70 commit 0f33d09

File tree

3 files changed

+234
-40
lines changed

3 files changed

+234
-40
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
module Concurrent
2+
module Promises
3+
module FactoryMethods
4+
5+
# @!visibility private
6+
7+
module OldChannelIntegration
8+
9+
# @!visibility private
10+
11+
# only proof of concept
12+
# @return [Future]
13+
def select(*channels)
14+
# TODO (pitr-ch 26-Mar-2016): re-do, has to be non-blocking
15+
future do
16+
# noinspection RubyArgCount
17+
Channel.select do |s|
18+
channels.each do |ch|
19+
s.take(ch) { |value| [value, ch] }
20+
end
21+
end
22+
end
23+
end
24+
end
25+
26+
include OldChannelIntegration
27+
end
28+
29+
class Future < AbstractEventFuture
30+
31+
# @!visibility private
32+
33+
module OldChannelIntegration
34+
35+
# @!visibility private
36+
37+
# Zips with selected value form the suplied channels
38+
# @return [Future]
39+
def then_select(*channels)
40+
future = Concurrent::Promises.select(*channels)
41+
ZipFuturesPromise.new_blocked_by2(self, future, @DefaultExecutor).future
42+
end
43+
44+
# @note may block
45+
# @note only proof of concept
46+
def then_put(channel)
47+
on_fulfillment_using(:io, channel) { |value, channel| channel.put value }
48+
end
49+
end
50+
51+
include OldChannelIntegration
52+
end
53+
end
54+
end

lib/concurrent/edge/promises.rb

Lines changed: 100 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1951,57 +1951,128 @@ def then_ask(actor)
19511951
include ActorIntegration
19521952
end
19531953

1954-
### Experimental features follow
1954+
class Channel < Concurrent::Synchronization::Object
1955+
safe_initialization!
19551956

1956-
module FactoryMethods
1957+
# Default size of the Channel, makes it accept unlimited number of messages.
1958+
UNLIMITED = Object.new
1959+
UNLIMITED.singleton_class.class_eval do
1960+
include Comparable
19571961

1958-
# @!visibility private
1962+
def <=>(other)
1963+
1
1964+
end
19591965

1960-
module ChannelIntegration
1966+
def to_s
1967+
'unlimited'
1968+
end
1969+
end
1970+
1971+
# A channel to pass messages between promises. The size is limited to support back pressure.
1972+
# @param [Integer, UNLIMITED] size the maximum number of messages stored in the channel.
1973+
def initialize(size = UNLIMITED)
1974+
super()
1975+
@Size = size
1976+
# TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation
1977+
@Mutex = Mutex.new
1978+
@Probes = []
1979+
@Messages = []
1980+
@PendingPush = []
1981+
end
19611982

1962-
# @!visibility private
19631983

1964-
# only proof of concept
1965-
# @return [Future]
1966-
def select(*channels)
1967-
# TODO (pitr-ch 26-Mar-2016): re-do, has to be non-blocking
1968-
future do
1969-
# noinspection RubyArgCount
1970-
Channel.select do |s|
1971-
channels.each do |ch|
1972-
s.take(ch) { |value| [value, ch] }
1984+
# Returns future which will fulfill when the message is added to the channel. Its value is the message.
1985+
# @param [Object] message
1986+
# @return [Future]
1987+
def push(message)
1988+
@Mutex.synchronize do
1989+
while true
1990+
if @Probes.empty?
1991+
if @Size > @Messages.size
1992+
@Messages.push message
1993+
return Promises.fulfilled_future message
1994+
else
1995+
pushed = Promises.resolvable_future
1996+
@PendingPush.push [message, pushed]
1997+
return pushed.with_hidden_resolvable
1998+
end
1999+
else
2000+
probe = @Probes.shift
2001+
if probe.fulfill [self, message], false
2002+
return Promises.fulfilled_future(message)
19732003
end
19742004
end
19752005
end
19762006
end
19772007
end
19782008

1979-
include ChannelIntegration
2009+
# Returns a future witch will become fulfilled with a value from the channel when one is available.
2010+
# @param [ResolvableFuture] probe the future which will be fulfilled with a channel value
2011+
# @return [Future] the probe, its value will be the message when available.
2012+
def pop(probe = Concurrent::Promises.resolvable_future)
2013+
# TODO (pitr-ch 26-Dec-2016): improve performance
2014+
pop_for_select(probe).then(&:last)
2015+
end
2016+
2017+
# @!visibility private
2018+
def pop_for_select(probe = Concurrent::Promises.resolvable_future)
2019+
@Mutex.synchronize do
2020+
if @Messages.empty?
2021+
@Probes.push probe
2022+
else
2023+
message = @Messages.shift
2024+
probe.fulfill [self, message]
2025+
2026+
unless @PendingPush.empty?
2027+
message, pushed = @PendingPush.shift
2028+
@Messages.push message
2029+
pushed.fulfill message
2030+
end
2031+
end
2032+
end
2033+
probe
2034+
end
2035+
2036+
# @return [String] Short string representation.
2037+
def to_s
2038+
format '<#%s:0x%x size:%s>', self.class, object_id << 1, @Size
2039+
end
2040+
2041+
alias_method :inspect, :to_s
19802042
end
19812043

19822044
class Future < AbstractEventFuture
2045+
module NewChannelIntegration
19832046

1984-
# @!visibility private
2047+
# @param [Channel] channel to push to.
2048+
# @return [Future] a future which is fulfilled after the message is pushed to the channel.
2049+
# May take a moment if the channel is full.
2050+
def then_push_channel(channel)
2051+
self.then { |value| channel.push value }.flat_future
2052+
end
2053+
2054+
# TODO (pitr-ch 26-Dec-2016): does it make sense to have rescue an chain variants as well, check other integrations as well
2055+
end
19852056

1986-
module ChannelIntegration
2057+
include NewChannelIntegration
2058+
end
19872059

1988-
# @!visibility private
2060+
module FactoryMethods
19892061

1990-
# Zips with selected value form the suplied channels
1991-
# @return [Future]
1992-
def then_select(*channels)
1993-
future = Concurrent::Promises.select(*channels)
1994-
ZipFuturesPromise.new_blocked_by2(self, future, @DefaultExecutor).future
1995-
end
2062+
module NewChannelIntegration
19962063

1997-
# @note may block
1998-
# @note only proof of concept
1999-
def then_put(channel)
2000-
on_fulfillment_using(:io, channel) { |value, channel| channel.put value }
2064+
# Selects a channel which is ready to be read from.
2065+
# @param [Channel] channels
2066+
# @return [Future] a future which is fulfilled with pair [channel, message] when one of the channels is
2067+
# available for reading
2068+
def select_channel(*channels)
2069+
probe = Promises.resolvable_future
2070+
channels.each { |ch| ch.pop_for_select probe }
2071+
probe
20012072
end
20022073
end
20032074

2004-
include ChannelIntegration
2075+
include NewChannelIntegration
20052076
end
20062077

20072078
end

spec/concurrent/edge/promises_spec.rb

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -426,19 +426,17 @@ def behaves_as_delay(delay, value)
426426
end
427427

428428
it 'with channel' do
429-
ch1 = Concurrent::Channel.new
430-
ch2 = Concurrent::Channel.new
429+
ch1 = Concurrent::Promises::Channel.new
430+
ch2 = Concurrent::Promises::Channel.new
431431

432-
result = Concurrent::Promises.select(ch1, ch2)
433-
ch1.put 1
434-
expect(result.value!).to eq [1, ch1]
432+
result = Concurrent::Promises.select_channel(ch1, ch2)
433+
ch1.push 1
434+
expect(result.value!).to eq [ch1, 1]
435435

436436

437-
future { 1+1 }.
438-
then_put(ch1)
439-
result = future { '%02d' }.
440-
then_select(ch1, ch2).
441-
then { |format, (value, channel)| format format, value }
437+
future { 1+1 }.then_push_channel(ch1)
438+
result = (Concurrent::Promises.future { '%02d' } & Concurrent::Promises.select_channel(ch1, ch2)).
439+
then { |format, (channel, value)| format format, value }
442440
expect(result.value!).to eq '02'
443441
end
444442
end
@@ -487,7 +485,7 @@ def behaves_as_delay(delay, value)
487485

488486
describe 'Throttling' do
489487
specify do
490-
limit = 4
488+
limit = 4
491489
throttle = Concurrent::Throttle.new limit
492490
counter = Concurrent::AtomicFixnum.new
493491
testing = -> *args do
@@ -523,4 +521,75 @@ def behaves_as_delay(delay, value)
523521
end).value!.all? { |v| v <= limit }).to be_truthy
524522
end
525523
end
524+
525+
describe 'Promises::Channel' do
526+
specify do
527+
channel = Concurrent::Promises::Channel.new 1
528+
529+
pushed1 = channel.push 1
530+
expect(pushed1.resolved?).to be_truthy
531+
expect(pushed1.value!).to eq 1
532+
533+
pushed2 = channel.push 2
534+
expect(pushed2.resolved?).to be_falsey
535+
536+
popped = channel.pop
537+
expect(pushed1.value!).to eq 1
538+
expect(pushed2.resolved?).to be_truthy
539+
expect(pushed2.value!).to eq 2
540+
expect(popped.value!).to eq 1
541+
542+
popped = channel.pop
543+
expect(popped.value!).to eq 2
544+
545+
popped = channel.pop
546+
expect(popped.resolved?).to be_falsey
547+
548+
pushed3 = channel.push 3
549+
expect(popped.value!).to eq 3
550+
expect(pushed3.resolved?).to be_truthy
551+
expect(pushed3.value!).to eq 3
552+
end
553+
554+
specify do
555+
ch1 = Concurrent::Promises::Channel.new
556+
ch2 = Concurrent::Promises::Channel.new
557+
ch3 = Concurrent::Promises::Channel.new
558+
559+
add = -> do
560+
(ch1.pop & ch2.pop).then do |a, b|
561+
if a == :done && b == :done
562+
:done
563+
else
564+
ch3.push a + b
565+
add.call
566+
end
567+
end
568+
end
569+
570+
ch1.push 1
571+
ch2.push 2
572+
ch1.push 'a'
573+
ch2.push 'b'
574+
ch1.push nil
575+
ch2.push true
576+
577+
result = Concurrent::Promises.future(&add).run.result
578+
expect(result[0..1]).to eq [false, nil]
579+
expect(result[2]).to be_a_kind_of(NoMethodError)
580+
expect(ch3.pop.value!).to eq 3
581+
expect(ch3.pop.value!).to eq 'ab'
582+
583+
ch1.push 1
584+
ch2.push 2
585+
ch1.push 'a'
586+
ch2.push 'b'
587+
ch1.push :done
588+
ch2.push :done
589+
590+
expect(Concurrent::Promises.future(&add).run.result).to eq [true, :done, nil]
591+
expect(ch3.pop.value!).to eq 3
592+
expect(ch3.pop.value!).to eq 'ab'
593+
end
594+
end
526595
end

0 commit comments

Comments
 (0)