Class: PipelineToolkit::MessagePusher
- Inherits:
-
Object
- Object
- PipelineToolkit::MessagePusher
- Includes:
- Amqp::Writer, MessageCommand
- Defined in:
- lib/pipeline_toolkit/message_pusher.rb
Overview
A Message Queue machine to handle messages that has to be published back into a message queue.
Instance Attribute Summary
Attributes included from Amqp::Abstract
Attributes included from MessageCommand
Instance Method Summary collapse
- #description ⇒ Object
-
#initialize(options = {}) ⇒ MessagePusher
constructor
Initializes a new intance.
-
#initialize_machine ⇒ Object
Initialize the AMQP server connection and exchange so we can write messages to the queue.
-
#process_standard(message) ⇒ Object
Handle the messages by writing them into the AMQP server exchange.
-
#stop ⇒ Object
Override stop, so that we can close AMQP connection gracefully.
Methods included from Amqp::Writer
#initialize_queues, #initialize_writer, #publish, #queue_names
Methods included from Amqp::Abstract
#bind_queue, #initialize_channel, #initialize_connection, #initialize_exchange, #initialize_queue, #stop_connection
Methods included from MessageCommand
#acknowledge, #cleanup, #pass_on, #process, #process_system, #send_description, #start, #write_to_pipe
Constructor Details
#initialize(options = {}) ⇒ MessagePusher
Initializes a new intance
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/pipeline_toolkit/message_pusher.rb', line 18 def initialize( = {}) super() DefaultLogger.init_logger() if [:log_conf] DefaultLogger.debug("MessagePusher#initialize(options = {})") queues_string = [:queues].map { |name, type| "#{name} (key:#{type})" }.join(",") DefaultLogger.info "================================================================" DefaultLogger.info "Booting #{self.class.name} (#{[:env]})" DefaultLogger.info "Exchange: #{[:exchange]} (#{[:type]} passive:#{[:passive]} durable:#{[:durable]})" DefaultLogger.info "Queues: #{queues_string}" DefaultLogger.info "amqp://#{[:user]}:#{[:pass]}@#{[:host]}:#{[:port]}#{[:vhost]}" DefaultLogger.info "" end |
Instance Method Details
#description ⇒ Object
52 53 54 |
# File 'lib/pipeline_toolkit/message_pusher.rb', line 52 def description "exchange:#{[:exchange]}" end |
#initialize_machine ⇒ Object
Initialize the AMQP server connection and exchange so we can write messages to the queue.
37 38 39 40 41 |
# File 'lib/pipeline_toolkit/message_pusher.rb', line 37 def initialize_machine DefaultLogger.debug("MessagePusher#initialize_machine") initialize_writer initialize_queues end |
#process_standard(message) ⇒ Object
Handle the messages by writing them into the AMQP server exchange.
61 62 63 64 65 66 |
# File 'lib/pipeline_toolkit/message_pusher.rb', line 61 def process_standard() publish(.except_keys(:ack_id)) acknowledge() if [:ack] && [:ack_id] rescue Exception => e DefaultLogger.error("msg_push bad message: START|#{}|END\nbecause: #{e.class.name}: #{e.}") end |
#stop ⇒ Object
Override stop, so that we can close AMQP connection gracefully.
46 47 48 49 50 |
# File 'lib/pipeline_toolkit/message_pusher.rb', line 46 def stop DefaultLogger.info("Shutting down #{self.class.name}") DefaultLogger.info "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^" stop_connection { EM.stop } end |