Skip to content

Merge actress into concurrent-ruby #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ source 'https://rubygems.org'

gemspec

group :actress do
gem 'algebrick'
gem 'atomic'
end

group :development do
gem 'rake', '~> 10.2.2'
gem 'countloc', '~> 0.4.0', platforms: :mri
Expand Down
1 change: 1 addition & 0 deletions lib/concurrent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
require 'concurrent/supervisor'
require 'concurrent/timer_task'
require 'concurrent/tvar'
require 'concurrent/actress'

# Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell,
# F#, C#, Java, and classic concurrency patterns.
Expand Down
295 changes: 295 additions & 0 deletions lib/concurrent/actress.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
require 'algebrick'
require 'atomic'
require 'logger'

module Concurrent
module Actress
Error = Class.new(StandardError)

class ActressTerminated < Error
include Algebrick::TypeCheck

def initialize(reference)
Type! reference, Reference
super reference.path
end
end

def self.current
Thread.current[:__current_actress__]
end

module CoreDelegations
def path
core.path
end

def parent
core.parent
end

def terminated?
core.terminated?
end

def reference
core.reference
end

alias_method :ref, :reference
end

class Reference
include Algebrick::TypeCheck
include Algebrick::Types
include CoreDelegations

attr_reader :core
private :core

def initialize(core)
@core = Type! core, Core
end


def tell(message)
message message, nil
end

alias_method :<<, :tell

def ask(message, ivar = IVar.new)
message message, ivar
end

def message(message, ivar = nil)
core.on_envelope Envelope[message,
ivar ? Some[IVar][ivar] : None,
Actress.current ? Some[Reference][Actress.current] : None]
return ivar || self
end

def to_s
"#<#{self.class} #{path}>"
end

alias_method :inspect, :to_s

def ==(other)
Type? other, self.class and other.send(:core) == core
end
end

include Algebrick::Types

Envelope = Algebrick.type do
fields! message: Object,
ivar: Maybe[IVar],
sender: Maybe[Reference]
end

module Envelope
def sender_path
sender.maybe { |reference| reference.path } || 'outside-actress'
end

def reject!(error)
ivar.maybe { |v| v.fail error }
end
end

class Core
include Algebrick::TypeCheck

attr_reader :reference, :name, :path, :logger, :parent_core
private :parent_core

def initialize(parent, name, actress_class, *args, &block)
@mailbox = Array.new
@one_by_one = OneByOne.new
@executor = Concurrent.configuration.global_task_pool # TODO configurable
@parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
@name = (Type! name, String, Symbol).to_s
@children = Atomic.new []
@path = @parent_core ? File.join(@parent_core.path, @name) : @name
@logger = Logger.new($stderr) # TODO add proper logging
@logger.progname = @path
@reference = Reference.new self
# noinspection RubyArgCount
@terminated = Event.new
@mutex = Mutex.new

@actress_class = Child! actress_class, Abstract
schedule_execution do
parent_core.add_child reference if parent_core
@actress = actress_class.new self, *args, &block # FIXME it may fail
end
end

def parent
@parent_core.reference
end

def children
@children.get
end

def add_child(child)
Type! child, Reference
@children.update { |o| [*o, child] }
end

def remove_child(child)
Type! child, Reference
@children.update { |o| o - [child] }
end

def on_envelope(envelope)
schedule_execution { execute_on_envelope envelope }
end

def terminated?
@terminated.set?
end

def terminate!
guard!
@terminated.set
parent_core.remove_child reference if parent_core
end

def guard!
raise 'can be called only inside this actor' unless Actress.current == reference
end

private

def process?
unless @mailbox.empty? || @receive_envelope_scheduled
@receive_envelope_scheduled = true
schedule_execution { receive_envelope }
end
end

def receive_envelope
envelope = @mailbox.shift

if terminated?
# FIXME make sure that it cannot be GCed before all messages are rejected after termination
reject_envelope envelope
logger.debug "rejected #{envelope.message} from #{envelope.sender_path}"
return
end
logger.debug "received #{envelope.message} from #{envelope.sender_path}"

result = @actress.on_envelope envelope
envelope.ivar.maybe { |iv| iv.set result }
rescue => error
logger.error error
envelope.ivar.maybe { |iv| iv.fail error }
ensure
@receive_envelope_scheduled = false
process?
end

def schedule_execution
@one_by_one.post(@executor) do
begin
# TODO enable this mutex only on JRuby
@mutex.lock # only for JRuby
Thread.current[:__current_actress__] = reference
yield
rescue => e
puts e
ensure
Thread.current[:__current_actress__] = nil
@mutex.unlock # only for JRuby
end
end
end

def execute_on_envelope(envelope)
if terminated?
reject_envelope envelope
else
@mailbox.push envelope
end
process?
end

def create_and_set_actor(actress_class, block, *args)
parent_core.add_child reference if parent_core
@actress = actress_class.new self, *args, &block # FIXME may fail
end

def reject_envelope(envelope)
envelope.reject! ActressTerminated.new(reference)
end
end

class Abstract
include Algebrick::TypeCheck
extend Algebrick::TypeCheck
include Algebrick::Matching
include CoreDelegations

attr_reader :core

def self.new(core, *args, &block)
allocate.tap do |actress|
actress.__send__ :pre_initialize, core
actress.__send__ :initialize, *args, &block
end
end

def on_message(message)
raise NotImplementedError
end

def logger
core.logger
end

def on_envelope(envelope)
@envelope = envelope
on_message envelope.message
ensure
@envelope = nil
end

def spawn(actress_class, name, *args, &block)
Actress.spawn(actress_class, name, *args, &block)
end

def children
core.children
end

def terminate!
core.terminate!
end

private

def pre_initialize(core)
@core = Type! core, Core
end

def envelope
@envelope or raise 'envelope not set'
end
end

class Root < Abstract
def on_message(message)
# ignore
end
end

ROOT = Core.new(nil, '/', Root).reference

def self.spawn(actress_class, name, *args, &block)
Core.new(Actress.current || ROOT, name, actress_class, *args, &block).reference
end
end
end
7 changes: 6 additions & 1 deletion lib/concurrent/obligation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ def incomplete?
end

def value(timeout = nil)
event.wait(timeout) if timeout != 0 && incomplete?
wait timeout
super()
end

def wait(timeout = nil)
event.wait(timeout) if timeout != 0 && incomplete?
self
end

def state
mutex.lock
result = @state
Expand Down
50 changes: 50 additions & 0 deletions spec/concurrent/actress_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
require 'spec_helper'
require_relative 'dereferenceable_shared'
require_relative 'observable_shared'

module Concurrent

describe Actress do
Child = Algebrick.atom
Terminate = Algebrick.atom

class Ping < Actress::Abstract

def initialize(queue)
@queue = queue
end

def on_message(message)
match message,
on(Terminate) { terminate! },
on(Child) { spawn Ping, :pong, @queue },
(on(any) do
@queue << message
message
end)

end
end

it 'works' do
queue = Queue.new
actor = Actress.spawn Ping, :ping, queue

actor << 'a' << 1
queue.pop.should eq 'a'
actor.ask(2).value.should eq 2

actor.parent.should eq Actress::ROOT
Actress::ROOT.path.should eq '/'
actor.path.should eq '/ping'
child = actor.ask(Child).value
child.path.should eq '/ping/pong'
queue.clear
child.ask(3)
queue.pop.should eq 3

actor << Terminate
actor.ask(:blow_up).wait.rejected?.should be_true
end
end
end