Class: ActiveMessaging::MessageReceiver
- Inherits:
-
Object
- Object
- ActiveMessaging::MessageReceiver
- Includes:
- Celluloid
- Defined in:
- lib/activemessaging/threaded_poller.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#pause ⇒ Object
Returns the value of attribute pause.
-
#poller ⇒ Object
Returns the value of attribute poller.
Instance Method Summary collapse
-
#initialize(poller, connection, pause = 1) ⇒ MessageReceiver
constructor
A new instance of MessageReceiver.
- #logger ⇒ Object
- #receive(worker) ⇒ Object
Constructor Details
#initialize(poller, connection, pause = 1) ⇒ MessageReceiver
Returns a new instance of MessageReceiver.
150 151 152 153 154 155 156 157 158 |
# File 'lib/activemessaging/threaded_poller.rb', line 150 def initialize(poller, connection, pause=1) logger.debug("MessageReceiver initialize: poller:#{poller}, connection:#{connection}, pause:#{pause}") raise "No connection found for '#{poller.connection}'" unless connection self.poller = poller self.connection = connection self.pause = pause end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
148 149 150 |
# File 'lib/activemessaging/threaded_poller.rb', line 148 def connection @connection end |
#pause ⇒ Object
Returns the value of attribute pause.
148 149 150 |
# File 'lib/activemessaging/threaded_poller.rb', line 148 def pause @pause end |
#poller ⇒ Object
Returns the value of attribute poller.
148 149 150 |
# File 'lib/activemessaging/threaded_poller.rb', line 148 def poller @poller end |
Instance Method Details
#logger ⇒ Object
181 |
# File 'lib/activemessaging/threaded_poller.rb', line 181 def logger; ::ActiveMessaging.logger; end |
#receive(worker) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/activemessaging/threaded_poller.rb', line 160 def receive(worker) return unless poller.running # logger.debug("***** MessageReceiver calling receive") = self.connection.receive(worker.) # logger.debug("***** MessageReceiver receive returned") if logger.debug("ActiveMessaging::MessageReceiver.receive: message:'#{.inspect}'") poller.dispatch!(, worker) else if (!poller || !poller.alive? || !poller.running) logger.debug("ActiveMessaging::MessageReceiver.receive: terminate") self.terminate end logger.debug("ActiveMessaging::MessageReceiver.receive: no message, retry in #{pause} sec") after(pause) { receive(worker) } end end |