Module: PipelineToolkit::Amqp::Writer

Includes:
Abstract
Included in:
MessagePusher
Defined in:
lib/pipeline_toolkit/amqp/writer.rb

Overview

The Writer provides functionality specific to asynchronous message publishing to the AMQP server.

Instance Attribute Summary

Attributes included from Abstract

#options

Instance Method Summary collapse

Methods included from Abstract

#bind_queue, #initialize, #initialize_channel, #initialize_connection, #initialize_exchange, #initialize_queue, #stop_connection

Instance Method Details

#initialize_queuesObject

Initializes the queues to publish the messages too.



28
29
30
31
32
33
34
# File 'lib/pipeline_toolkit/amqp/writer.rb', line 28

def initialize_queues
  DefaultLogger.debug("Amqp::Writer#initialize_queues")
  options[:queues].each do |name, routing_key|
    initialize_queue(name)
    bind_queue(routing_key)
  end
end

#initialize_writerObject

Initialize the AMQP server specific to handling of messages publishing to the server.



18
19
20
21
22
23
# File 'lib/pipeline_toolkit/amqp/writer.rb', line 18

def initialize_writer
  DefaultLogger.debug("Amqp::Writer#initialize_writer")
  initialize_connection
  initialize_channel
  initialize_exchange
end

#publish(message) ⇒ Object

Handles the publishing of messages into the AMQP server.

Parameters:

  • message (Hash)

    The message to publish



41
42
43
44
45
46
47
48
# File 'lib/pipeline_toolkit/amqp/writer.rb', line 41

def publish(message)
  DefaultLogger.debug("Amqp::Writer#publish(message)")
  if message.has_key?(:routing_key)
    @exchange.publish(MessageCoder.encode(message), :key => message[:routing_key])
  else
    @exchange.publish(MessageCoder.encode(message))
  end
end

#queue_namesObject



50
51
52
# File 'lib/pipeline_toolkit/amqp/writer.rb', line 50

def queue_names
  options[:queues].map { |name, type| name }
end