Class: PipelineToolkit::MessagePusher

Inherits:
Object
  • Object
show all
Includes:
Amqp::Writer, MessageCommand
Defined in:
lib/pipeline_toolkit/message_pusher.rb

Overview

A Message Queue machine to handle messages that has to be published back into a message queue.

Instance Attribute Summary

Attributes included from Amqp::Abstract

#options

Attributes included from MessageCommand

#options

Instance Method Summary collapse

Methods included from Amqp::Writer

#initialize_queues, #initialize_writer, #publish, #queue_names

Methods included from Amqp::Abstract

#bind_queue, #initialize_channel, #initialize_connection, #initialize_exchange, #initialize_queue, #stop_connection

Methods included from MessageCommand

#acknowledge, #cleanup, #pass_on, #process, #process_system, #send_description, #start, #write_to_pipe

Constructor Details

#initialize(options = {}) ⇒ MessagePusher

Initializes a new intance

Parameters:

  • options (Hash) (defaults to: {})

    Options hash for the message pusher.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/pipeline_toolkit/message_pusher.rb', line 18

def initialize(options = {})
  super(options)
  
  DefaultLogger.init_logger(options) if options[:log_conf]
  
  DefaultLogger.debug("MessagePusher#initialize(options = {})") 

  queues_string = options[:queues].map { |name, type| "#{name} (key:#{type})" }.join(",")
  DefaultLogger.info "================================================================"
  DefaultLogger.info "Booting #{self.class.name}  (#{options[:env]})"
  DefaultLogger.info "Exchange: #{options[:exchange]} (#{options[:type]} passive:#{options[:passive]} durable:#{options[:durable]})"
  DefaultLogger.info "Queues: #{queues_string}"
  DefaultLogger.info "amqp://#{options[:user]}:#{options[:pass]}@#{options[:host]}:#{options[:port]}#{options[:vhost]}"
  DefaultLogger.info ""
end

Instance Method Details

#descriptionObject



52
53
54
# File 'lib/pipeline_toolkit/message_pusher.rb', line 52

def description
  "exchange:#{options[:exchange]}"
end

#initialize_machineObject

Initialize the AMQP server connection and exchange so we can write messages to the queue.



37
38
39
40
41
# File 'lib/pipeline_toolkit/message_pusher.rb', line 37

def initialize_machine
  DefaultLogger.debug("MessagePusher#initialize_machine") 
  initialize_writer
  initialize_queues
end

#process_standard(message) ⇒ Object

Handle the messages by writing them into the AMQP server exchange.

Parameters:

  • message (Hash)

    The message to write to the exchange.



61
62
63
64
65
66
67
# File 'lib/pipeline_toolkit/message_pusher.rb', line 61

def process_standard(message)
  DefaultLogger.error("MessagePusher#process_standard(message)") 
  publish(message.except_keys(:ack_id))
  acknowledge(message) if options[:ack] && message[:ack_id]
rescue Exception => e
  DefaultLogger.error("msg_push bad message: START|#{message}|END\nbecause: #{e.class.name}: #{e.message}")
end

#stopObject

Override stop, so that we can close AMQP connection gracefully.



46
47
48
49
50
# File 'lib/pipeline_toolkit/message_pusher.rb', line 46

def stop
  DefaultLogger.info("Shutting down #{self.class.name}")
  DefaultLogger.info "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
  stop_connection { EM.stop }
end