Class: Qpid::Proton::Reactor::Reactor
- Inherits:
-
Object
- Object
- Qpid::Proton::Reactor::Reactor
- Defined in:
- lib/reactor/reactor.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
Class Method Summary collapse
Instance Method Summary collapse
- #acceptor(host, port, handler = nil) ⇒ Object
- #connection(handler = nil) ⇒ Object
- #global_handler ⇒ Object
- #global_handler=(handler) ⇒ Object
- #handler ⇒ Object
- #handler=(handler) ⇒ Object
-
#initialize(handlers, options = {}) ⇒ Reactor
constructor
A new instance of Reactor.
- #on_error(info) ⇒ Object
- #process ⇒ Object
- #push_event(obj, etype) ⇒ Object
-
#quiesced? ⇒ Boolean
Returns whether the reactor has any unbuffered data.
- #run(&block) ⇒ Object
- #schedule(delay, task) ⇒ Object
- #selectable(handler = nil) ⇒ Object
-
#timeout ⇒ Fixnum
Returns the timeout period.
-
#timeout=(timeout) ⇒ Object
Sets the timeout period.
- #update(sel) ⇒ Object
- #wakeup ⇒ Object
Constructor Details
#initialize(handlers, options = {}) ⇒ Reactor
Returns a new instance of Reactor.
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/reactor/reactor.rb', line 53 def initialize(handlers, = {}) @impl = [:impl] if @impl.nil? @impl = Cproton.pn_reactor end if !handlers.nil? [handlers].flatten.each {|handler| self.handler.add(handler)} end @errors = [] @handlers = [] self.class.store_instance(self, :pn_reactor_attachments) end |
Instance Attribute Details
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
45 46 47 |
# File 'lib/reactor/reactor.rb', line 45 def errors @errors end |
Class Method Details
Instance Method Details
#acceptor(host, port, handler = nil) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/reactor/reactor.rb', line 155 def acceptor(host, port, handler = nil) impl = chandler(handler, self.method(:on_error)) aimpl = Cproton.pn_reactor_acceptor(@impl, host, "#{port}", impl) Cproton.pn_decref(impl) if !aimpl.nil? return Acceptor.new(aimpl) else io = Cproton.pn_reactor_io(@impl) io_error = Cproton.pn_io_error(io) error_text = Cproton.pn_error_text(io_error) text = "(#{Cproton.pn_error_text(io_error)} (#{host}:#{port}))" raise IOError.new(text) end end |
#connection(handler = nil) ⇒ Object
170 171 172 173 174 175 |
# File 'lib/reactor/reactor.rb', line 170 def connection(handler = nil) impl = chandler(handler, self.method(:on_error)) conn = Qpid::Proton::Connection.wrap(Cproton.pn_reactor_connection(@impl, impl)) Cproton.pn_decref(impl) return conn end |
#global_handler ⇒ Object
79 80 81 82 |
# File 'lib/reactor/reactor.rb', line 79 def global_handler impl = Cproton.pn_reactor_get_global_handler(@impl) Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error)) end |
#global_handler=(handler) ⇒ Object
84 85 86 87 88 |
# File 'lib/reactor/reactor.rb', line 84 def global_handler=(handler) impl = chandler(handler, self.method(:on_error)) Cproton.pn_reactor_set_global_handler(@impl, impl) Cproton.pn_decref(impl) end |
#handler ⇒ Object
106 107 108 109 |
# File 'lib/reactor/reactor.rb', line 106 def handler impl = Cproton.pn_reactor_get_handler(@impl) Qpid::Proton::Handler::WrappedHandler.wrap(impl, self.method(:on_error)) end |
#handler=(handler) ⇒ Object
111 112 113 114 115 |
# File 'lib/reactor/reactor.rb', line 111 def handler=(handler) impl = chandler(handler, set.method(:on_error)) Cproton.pn_reactor_set_handler(@impl, impl) Cproton.pn_decref(impl) end |
#on_error(info) ⇒ Object
74 75 76 77 |
# File 'lib/reactor/reactor.rb', line 74 def on_error(info) self.errors << info self.yield end |
#process ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/reactor/reactor.rb', line 136 def process result = Cproton.pn_reactor_process(@impl) if !self.errors.nil? && !self.errors.empty? (0...self.errors.size).each do |index| error_set = self.errors[index] print error.backtrace.join("\n") end raise self.errors.last end return result end |
#push_event(obj, etype) ⇒ Object
192 193 194 |
# File 'lib/reactor/reactor.rb', line 192 def push_event(obj, etype) Cproton.pn_collector_put(Cproton.pn_reactor_collector(@impl), Qpid::Proton::Util::RBCTX, Cproton.pn_py2void(obj), etype.number) end |
#quiesced? ⇒ Boolean
Returns whether the reactor has any unbuffered data.
70 71 72 |
# File 'lib/reactor/reactor.rb', line 70 def quiesced? Cproton.pn_reactor_quiesced(@impl) end |
#run(&block) ⇒ Object
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/reactor/reactor.rb', line 117 def run(&block) self.timeout = 3.14159265359 self.start while self.process do if block_given? yield end end self.stop end |
#schedule(delay, task) ⇒ Object
148 149 150 151 152 153 |
# File 'lib/reactor/reactor.rb', line 148 def schedule(delay, task) impl = chandler(task, self.method(:on_error)) task = Task.wrap(Cproton.pn_reactor_schedule(@impl, sec_to_millis(delay), impl)) Cproton.pn_decref(impl) return task end |
#selectable(handler = nil) ⇒ Object
177 178 179 180 181 182 183 184 185 186 |
# File 'lib/reactor/reactor.rb', line 177 def selectable(handler = nil) impl = chandler(handler, self.method(:on_error)) result = Selectable.wrap(Cproton.pn_reactor_selectable(@impl)) if !impl.nil? record = Cproton.(result.impl) Cproton.pn_record_set_handler(record, impl) Cproton.pn_decref(impl) end return result end |
#timeout ⇒ Fixnum
Returns the timeout period.
94 95 96 |
# File 'lib/reactor/reactor.rb', line 94 def timeout millis_to_timeout(Cproton.pn_reactor_get_timeout(@impl)) end |
#timeout=(timeout) ⇒ Object
Sets the timeout period.
102 103 104 |
# File 'lib/reactor/reactor.rb', line 102 def timeout=(timeout) Cproton.pn_reactor_set_timeout(@impl, timeout_to_millis(timeout)) end |
#update(sel) ⇒ Object
188 189 190 |
# File 'lib/reactor/reactor.rb', line 188 def update(sel) Cproton.pn_reactor_update(@impl, sel.impl) end |
#wakeup ⇒ Object
128 129 130 131 132 133 134 |
# File 'lib/reactor/reactor.rb', line 128 def wakeup n = Cproton.pn_reactor_wakeup(@impl) unless n.zero? io = Cproton.pn_reactor_io(@impl) raise IOError.new(Cproton.pn_io_error(io)) end end |