Class: ForeignActor::InternalReactor

Inherits:
Object
  • Object
show all
Defined in:
lib/foreign_actor/reactor.rb

Direct Known Subclasses

Reactor

Constant Summary collapse

StopLoop =
Class.new(RuntimeError)

Instance Method Summary collapse

Constructor Details

#initialize(context = nil, poller = nil, timers = nil) ⇒ InternalReactor

Returns a new instance of InternalReactor.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/foreign_actor/reactor.rb', line 11

def initialize(context = nil, poller = nil, timers = nil)
  @context = context || ZMQ::Context.new
  @poller = poller || ZMQ::Poller.new
  @waiting_readables = {}
  @servers = {}
  @timers = timers || Timers.new
  
  @control_socket_srv = @context.socket(ZMQ::PAIR)
  @control_socket_cl = @context.socket(ZMQ::PAIR)
  @control_messages = Array.new
  
  addr = "inproc://ctrl"
  @control_socket_srv.bind(addr)
  @control_socket_cl.connect(addr)
  
  @poller.register_readable(@control_socket_srv)
  
end

Instance Method Details

#request(s, sync, timeout, method, *args) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/foreign_actor/reactor.rb', line 48

def request(s, sync, timeout, method, *args)
  type = sync ? 'sync_call' : 'async_call'
  msg = build_msg(Celluloid::Task.current.object_id, type, method, args)
  
  send_msg(s, '', msg)
  wait_answer(s, timeout) if sync
end

#runObject



94
95
96
97
98
# File 'lib/foreign_actor/reactor.rb', line 94

def run
  loop { run_once() }
rescue StopLoop
  
end

#run_once(allow_blocking = true) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/foreign_actor/reactor.rb', line 100

def run_once(allow_blocking = true)
  @timers.fire()
  
  wait_time = if @timers.wait_interval
    @timers.wait_interval * 1000
  else
    allow_blocking ? :blocking : 0
  end
  
  rc = @poller.poll(wait_time)
  unless ZMQ::Util.resultcode_ok?(rc)
    raise IOError, "libxs poll error: #{ZMQ::Util.error_string}"
  end
  
  @poller.readables.each do |s|
    parts = receive_msg(s)
    handle_message(s, parts)
  end
  
  rc
end

#serve_actor(endpoint, actor_or_name) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/foreign_actor/reactor.rb', line 79

def serve_actor(endpoint, actor_or_name)
  s = socket(ZMQ::XREP)
  rc = s.connect(endpoint)
  unless ZMQ::Util.resultcode_ok?(rc)
    raise IOError, "connect failed: #{ZMQ::Util.error_string}"
  end
  
  if @servers.has_key?(s)
    raise ArgumentError, "another class is already registered as server for #{s}"
  end
  
  register_server(s, actor_or_name)
  @poller.register_readable(s)
end

#socket(*args) ⇒ Object



43
44
45
# File 'lib/foreign_actor/reactor.rb', line 43

def socket(*args)
  @context.socket(*args)
end

#suspend_reactor(msg = 'suspend') ⇒ Object



31
32
33
34
35
36
# File 'lib/foreign_actor/reactor.rb', line 31

def suspend_reactor(msg = 'suspend')
  if @control_socket_cl
    @control_messages << msg
    @control_socket_cl.send_string('')
  end
end

#wait_answer(s, timeout = nil) ⇒ Object

wait until the socket receives a new message



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/foreign_actor/reactor.rb', line 57

def wait_answer(s, timeout = nil)
  task = Celluloid::Task.current
  task_id = task.object_id
  
  if @waiting_readables.has_key?(task_id)
    raise ArgumentError, "task is already listening ???"
  end
  
  @poller.register_readable(s)
  @waiting_readables[s] ||= {}
  @waiting_readables[s][task_id] = task
  
  timer = nil
  if timeout
    timer = @timers.after(timeout){ task.resume(:timeout) }
  end
  
  Celluloid::Task.suspend(:xs_wait).tap do
    timer.cancel() if timer
  end
end

#wakeup_reactorObject



38
39
40
# File 'lib/foreign_actor/reactor.rb', line 38

def wakeup_reactor()
  signal(:resume_reactor)
end