Class: Qpid::Proton::Reactor::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/reactor/reactor.rb

Direct Known Subclasses

Container

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handlers, options = {}) ⇒ Reactor

Returns a new instance of Reactor.



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/reactor/reactor.rb', line 53

def initialize(handlers, options = {})
  @impl = options[:impl]
  if @impl.nil?
    @impl = Cproton.pn_reactor
  end
  if !handlers.nil?
    [handlers].flatten.each {|handler| self.handler.add(handler)}
  end
  @errors = []
  @handlers = []
  self.class.store_instance(self, :pn_reactor_attachments)
end

Instance Attribute Details

#errorsObject (readonly)

Returns the value of attribute errors.



45
46
47
# File 'lib/reactor/reactor.rb', line 45

def errors
  @errors
end

Class Method Details

.wrap(impl) ⇒ Object



47
48
49
50
51
# File 'lib/reactor/reactor.rb', line 47

def self.wrap(impl)
  return nil if impl.nil?

  self.fetch_instance(impl, :pn_reactor_attachments) || Reactor.new(nil, :impl => impl)
end

Instance Method Details

#acceptor(host, port, handler = nil) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/reactor/reactor.rb', line 155

def acceptor(host, port, handler = nil)
  impl = chandler(handler, self.method(:on_error))
  aimpl = Cproton.pn_reactor_acceptor(@impl, host, "#{port}", impl)
  Cproton.pn_decref(impl)
  if !aimpl.nil?
    return Acceptor.new(aimpl)
  else
    io = Cproton.pn_reactor_io(@impl)
    io_error = Cproton.pn_io_error(io)
    error_text = Cproton.pn_error_text(io_error)
    text = "(#{Cproton.pn_error_text(io_error)} (#{host}:#{port}))"
    raise IOError.new(text)
  end
end

#connection(handler = nil) ⇒ Object



170
171
172
173
174
175
# File 'lib/reactor/reactor.rb', line 170

def connection(handler = nil)
  impl = chandler(handler, self.method(:on_error))
  conn = Qpid::Proton::Connection.wrap(Cproton.pn_reactor_connection(@impl, impl))
  Cproton.pn_decref(impl)
  return conn
end

#global_handlerObject



79
80
81
82
# File 'lib/reactor/reactor.rb', line 79

def global_handler
  impl = Cproton.pn_reactor_get_global_handler(@impl)
  Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error))
end

#global_handler=(handler) ⇒ Object



84
85
86
87
88
# File 'lib/reactor/reactor.rb', line 84

def global_handler=(handler)
  impl = chandler(handler, self.method(:on_error))
  Cproton.pn_reactor_set_global_handler(@impl, impl)
  Cproton.pn_decref(impl)
end

#handlerObject



106
107
108
109
# File 'lib/reactor/reactor.rb', line 106

def handler
  impl = Cproton.pn_reactor_get_handler(@impl)
  Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error))
end

#handler=(handler) ⇒ Object



111
112
113
114
115
# File 'lib/reactor/reactor.rb', line 111

def handler=(handler)
  impl = chandler(handler, set.method(:on_error))
  Cproton.pn_reactor_set_handler(@impl, impl)
  Cproton.pn_decref(impl)
end

#on_error(info) ⇒ Object



74
75
76
77
# File 'lib/reactor/reactor.rb', line 74

def on_error(info)
  self.errors << info
  self.yield
end

#processObject



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/reactor/reactor.rb', line 136

def process
  result = Cproton.pn_reactor_process(@impl)
  if !self.errors.nil? && !self.errors.empty?
    (0...self.errors.size).each do |index|
      error_set = self.errors[index]
      print error.backtrace.join("\n")
    end
    raise self.errors.last
  end
  return result
end

#push_event(obj, etype) ⇒ Object



192
193
194
# File 'lib/reactor/reactor.rb', line 192

def push_event(obj, etype)
  Cproton.pn_collector_put(Cproton.pn_reactor_collector(@impl), Qpid::Proton::Util::RBCTX, Cproton.pn_py2void(obj), etype.number)
end

#quiesced?Boolean

Returns whether the reactor has any unbuffered data.

Returns:

  • (Boolean)

    True if there is no unbuffered data.



70
71
72
# File 'lib/reactor/reactor.rb', line 70

def quiesced?
  Cproton.pn_reactor_quiesced(@impl)
end

#run(&block) ⇒ Object



117
118
119
120
121
122
123
124
125
126
# File 'lib/reactor/reactor.rb', line 117

def run(&block)
  self.timeout = 3.14159265359
  self.start
  while self.process do
    if block_given?
      yield
    end
  end
  self.stop
end

#schedule(delay, task) ⇒ Object



148
149
150
151
152
153
# File 'lib/reactor/reactor.rb', line 148

def schedule(delay, task)
  impl = chandler(task, self.method(:on_error))
  task = Task.wrap(Cproton.pn_reactor_schedule(@impl, sec_to_millis(delay), impl))
  Cproton.pn_decref(impl)
  return task
end

#selectable(handler = nil) ⇒ Object



177
178
179
180
181
182
183
184
185
186
# File 'lib/reactor/reactor.rb', line 177

def selectable(handler = nil)
  impl = chandler(handler, self.method(:on_error))
  result = Selectable.wrap(Cproton.pn_reactor_selectable(@impl))
  if !impl.nil?
    record = Cproton.pn_selectable_attachments(result.impl)
    Cproton.pn_record_set_handler(record, impl)
    Cproton.pn_decref(impl)
  end
  return result
end

#timeoutFixnum

Returns the timeout period.

Returns:

  • (Fixnum)

    The timeout period, in seconds.



94
95
96
# File 'lib/reactor/reactor.rb', line 94

def timeout
  millis_to_timeout(Cproton.pn_reactor_get_timeout(@impl))
end

#timeout=(timeout) ⇒ Object

Sets the timeout period.

Parameters:

  • timeout (Fixnum)

    The timeout, in seconds.



102
103
104
# File 'lib/reactor/reactor.rb', line 102

def timeout=(timeout)
  Cproton.pn_reactor_set_timeout(@impl, timeout_to_millis(timeout))
end

#update(sel) ⇒ Object



188
189
190
# File 'lib/reactor/reactor.rb', line 188

def update(sel)
  Cproton.pn_reactor_update(@impl, sel.impl)
end

#wakeupObject



128
129
130
131
132
133
134
# File 'lib/reactor/reactor.rb', line 128

def wakeup
  n = Cproton.pn_reactor_wakeup(@impl)
  unless n.zero?
    io = Cproton.pn_reactor_io(@impl)
    raise IOError.new(Cproton.pn_io_error(io))
  end
end