Class: ActiveMessaging::MessageReceiver
- Inherits:
-
Object
- Object
- ActiveMessaging::MessageReceiver
- Includes:
- Celluloid
- Defined in:
- lib/activemessaging/threaded_poller.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#pause ⇒ Object
Returns the value of attribute pause.
-
#poller ⇒ Object
Returns the value of attribute poller.
Instance Method Summary collapse
-
#initialize(poller, connection, pause = 1) ⇒ MessageReceiver
constructor
A new instance of MessageReceiver.
- #inspect ⇒ Object
- #logger ⇒ Object
- #receive(worker) ⇒ Object
- #to_s ⇒ Object
Constructor Details
#initialize(poller, connection, pause = 1) ⇒ MessageReceiver
Returns a new instance of MessageReceiver.
156 157 158 159 160 161 162 163 164 |
# File 'lib/activemessaging/threaded_poller.rb', line 156 def initialize(poller, connection, pause=1) logger.debug("MessageReceiver initialize: poller:#{poller}, connection:#{connection}, pause:#{pause}") raise "No connection found for '#{poller.connection}'" unless connection self.poller = poller self.connection = connection self.pause = pause end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
154 155 156 |
# File 'lib/activemessaging/threaded_poller.rb', line 154 def connection @connection end |
#pause ⇒ Object
Returns the value of attribute pause.
154 155 156 |
# File 'lib/activemessaging/threaded_poller.rb', line 154 def pause @pause end |
#poller ⇒ Object
Returns the value of attribute poller.
154 155 156 |
# File 'lib/activemessaging/threaded_poller.rb', line 154 def poller @poller end |
Instance Method Details
#inspect ⇒ Object
187 188 189 |
# File 'lib/activemessaging/threaded_poller.rb', line 187 def inspect "#<MessageReceiver #{to_s}>" end |
#logger ⇒ Object
195 |
# File 'lib/activemessaging/threaded_poller.rb', line 195 def logger; ::ActiveMessaging.logger; end |
#receive(worker) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/activemessaging/threaded_poller.rb', line 166 def receive(worker) return unless poller.running # logger.debug("***** MessageReceiver calling receive") = self.connection.receive(worker.) # logger.debug("***** MessageReceiver receive returned") if logger.debug("ActiveMessaging::MessageReceiver.receive: message:'#{.inspect}'") poller.dispatch!(, worker) else if (!poller || !poller.alive? || !poller.running) logger.debug("ActiveMessaging::MessageReceiver.receive: terminate") self.terminate end logger.debug("ActiveMessaging::MessageReceiver.receive: no message for worker #{worker.object_id}, retry in #{pause} sec") after(pause) { receive(worker) } end end |
#to_s ⇒ Object
191 192 193 |
# File 'lib/activemessaging/threaded_poller.rb', line 191 def to_s @str ||= "#{Process.pid}-#{Thread.current.object_id}:#{self.object_id}" end |