Class: ActiveMessaging::MessageReceiver

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/activemessaging/threaded_poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(poller, connection, pause = 1) ⇒ MessageReceiver

Returns a new instance of MessageReceiver.



150
151
152
153
154
155
156
157
158
# File 'lib/activemessaging/threaded_poller.rb', line 150

def initialize(poller, connection, pause=1)
  logger.debug("MessageReceiver initialize: poller:#{poller}, connection:#{connection}, pause:#{pause}")
  
  raise "No connection found for '#{poller.connection}'" unless connection
  
  self.poller     = poller
  self.connection = connection
  self.pause      = pause
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



148
149
150
# File 'lib/activemessaging/threaded_poller.rb', line 148

def connection
  @connection
end

#pauseObject

Returns the value of attribute pause.



148
149
150
# File 'lib/activemessaging/threaded_poller.rb', line 148

def pause
  @pause
end

#pollerObject

Returns the value of attribute poller.



148
149
150
# File 'lib/activemessaging/threaded_poller.rb', line 148

def poller
  @poller
end

Instance Method Details

#loggerObject



181
# File 'lib/activemessaging/threaded_poller.rb', line 181

def logger; ::ActiveMessaging.logger; end

#receive(worker) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/activemessaging/threaded_poller.rb', line 160

def receive(worker)
  return unless poller.running

  # logger.debug("***** MessageReceiver calling receive")
  message = self.connection.receive(worker.options)
  # logger.debug("***** MessageReceiver receive returned")

  if message
    logger.debug("ActiveMessaging::MessageReceiver.receive: message:'#{message.inspect}'")
    poller.dispatch!(message, worker)
  else
    if (!poller || !poller.alive? || !poller.running)
      logger.debug("ActiveMessaging::MessageReceiver.receive: terminate")
      self.terminate
    end
    logger.debug("ActiveMessaging::MessageReceiver.receive: no message, retry in #{pause} sec")
    after(pause) { receive(worker) }
  end
  
end