Class: ForeignActor::InternalReactor
- Inherits:
-
Object
- Object
- ForeignActor::InternalReactor
- Defined in:
- lib/foreign_actor/reactor.rb
Direct Known Subclasses
Constant Summary collapse
- StopLoop =
Class.new(RuntimeError)
Instance Method Summary collapse
-
#initialize(context = nil, poller = nil, timers = nil) ⇒ InternalReactor
constructor
A new instance of InternalReactor.
- #request(s, sync, timeout, method, *args) ⇒ Object
- #run ⇒ Object
- #run_once(allow_blocking = true) ⇒ Object
- #serve_actor(endpoint, actor_or_name) ⇒ Object
- #socket(*args) ⇒ Object
- #suspend_reactor(msg = 'suspend') ⇒ Object
-
#wait_answer(s, timeout = nil) ⇒ Object
wait until the socket receives a new message.
- #wakeup_reactor ⇒ Object
Constructor Details
#initialize(context = nil, poller = nil, timers = nil) ⇒ InternalReactor
Returns a new instance of InternalReactor.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/foreign_actor/reactor.rb', line 11 def initialize(context = nil, poller = nil, timers = nil) @context = context || ZMQ::Context.new @poller = poller || ZMQ::Poller.new @waiting_readables = {} @servers = {} @timers = timers || Timers.new @control_socket_srv = @context.socket(ZMQ::PAIR) @control_socket_cl = @context.socket(ZMQ::PAIR) @control_messages = Array.new addr = "inproc://ctrl" @control_socket_srv.bind(addr) @control_socket_cl.connect(addr) @poller.register_readable(@control_socket_srv) end |
Instance Method Details
#request(s, sync, timeout, method, *args) ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/foreign_actor/reactor.rb', line 48 def request(s, sync, timeout, method, *args) type = sync ? 'sync_call' : 'async_call' msg = build_msg(Celluloid::Task.current.object_id, type, method, args) send_msg(s, '', msg) wait_answer(s, timeout) if sync end |
#run ⇒ Object
94 95 96 97 98 |
# File 'lib/foreign_actor/reactor.rb', line 94 def run loop { run_once() } rescue StopLoop end |
#run_once(allow_blocking = true) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/foreign_actor/reactor.rb', line 100 def run_once(allow_blocking = true) @timers.fire() wait_time = if @timers.wait_interval @timers.wait_interval * 1000 else allow_blocking ? :blocking : 0 end rc = @poller.poll(wait_time) unless ZMQ::Util.resultcode_ok?(rc) raise IOError, "libxs poll error: #{ZMQ::Util.error_string}" end @poller.readables.each do |s| parts = receive_msg(s) (s, parts) end rc end |
#serve_actor(endpoint, actor_or_name) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/foreign_actor/reactor.rb', line 79 def serve_actor(endpoint, actor_or_name) s = socket(ZMQ::XREP) rc = s.connect(endpoint) unless ZMQ::Util.resultcode_ok?(rc) raise IOError, "connect failed: #{ZMQ::Util.error_string}" end if @servers.has_key?(s) raise ArgumentError, "another class is already registered as server for #{s}" end register_server(s, actor_or_name) @poller.register_readable(s) end |
#socket(*args) ⇒ Object
43 44 45 |
# File 'lib/foreign_actor/reactor.rb', line 43 def socket(*args) @context.socket(*args) end |
#suspend_reactor(msg = 'suspend') ⇒ Object
31 32 33 34 35 36 |
# File 'lib/foreign_actor/reactor.rb', line 31 def suspend_reactor(msg = 'suspend') if @control_socket_cl @control_messages << msg @control_socket_cl.send_string('') end end |
#wait_answer(s, timeout = nil) ⇒ Object
wait until the socket receives a new message
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/foreign_actor/reactor.rb', line 57 def wait_answer(s, timeout = nil) task = Celluloid::Task.current task_id = task.object_id if @waiting_readables.has_key?(task_id) raise ArgumentError, "task is already listening ???" end @poller.register_readable(s) @waiting_readables[s] ||= {} @waiting_readables[s][task_id] = task timer = nil if timeout timer = @timers.after(timeout){ task.resume(:timeout) } end Celluloid::Task.suspend(:xs_wait).tap do timer.cancel() if timer end end |
#wakeup_reactor ⇒ Object
38 39 40 |
# File 'lib/foreign_actor/reactor.rb', line 38 def wakeup_reactor() signal(:resume_reactor) end |