Module: PipelineToolkit::Amqp::Abstract

Included in:
Reader, Writer
Defined in:
lib/pipeline_toolkit/amqp/abstract.rb

Overview

Provides abstract base functionality used in both Reader and Writer

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject (readonly)

Getter for the options hash



14
15
16
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 14

def options
  @options
end

Instance Method Details

#bind_queue(routing_key = '') ⇒ Object

Binds the queue to an exchange.

Parameters:

  • routing_key (Sting) (defaults to: '')

    Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration.



94
95
96
97
98
99
100
101
102
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 94

def bind_queue(routing_key = '')
  routing_key = options[:queue].split(":").last if options[:queue] && options[:queue].include?(":")
  DefaultLogger.debug("Amqp::Abstract#bind_queue(routing_key = '')")
  unless routing_key.nil? || routing_key.empty?
    @queue.bind(@exchange, :key => routing_key)
  else
    @queue.bind(@exchange)
  end
end

#initialize(options = {}) ⇒ Object

Initialize a new instance of either the Reader or Writer

Parameters:

  • options (Hash) (defaults to: {})

    The options hash

Options Hash (options):

  • :host (Symbol) — default: 'localhost'

    The AMQP server address for the AMQP server.

  • :port (Symbol) — default: 5672

    The AMQP server port for the AMQP server.

  • :user (Symbol) — default: 'guest'

    The username as defined by the AMQP server.

  • :pass (Symbol) — default: 'guest'

    The password for the associated :user as defined by the AMQP server.

  • :vhost (Symbol) — default: '/'

    The virtual host as defined by the AMQP server.

  • :type (Symbol) — default: 'fanout'

    The exchange type (direct, fanout or topic).

  • :exchange (Symbol) — default: ''

    The exchange name

  • :durable (Symbol) — default: false

    If set to true, the exchange will be marked as durable.

  • :passive (Symbol) — default: false

    If set to true, the server will not create the exchange if it does not already exist.

  • :ack (Symbol) — default: true

    If this field is set to false the server does not expect acknowledgments for messages.

  • :env (Symbol) — default: 'development'

    The environment to run (development, production).



32
33
34
35
36
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 32

def initialize(options = {})
  super(options)

  @options = options
end

#initialize_channelObject

Returns a new channel. A channel is a bidirectional virtual connection between the client and the AMQP server.



59
60
61
62
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 59

def initialize_channel
  DefaultLogger.debug("Amqp::Abstract#initialize_channel")
  @channel = MQ.new(@connection)
end

#initialize_connectionObject

Create a new connection to the AMQP server.



41
42
43
44
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 41

def initialize_connection
  DefaultLogger.debug("Amqp::Abstract#initialize_connection")
  @connection = AMQP.connect(options.select_keys(:host, :port, :user, :pass, :vhost))
end

#initialize_exchangeObject

Defines, intializes and returns an Exchange to act as an ingress point for all published messages.



68
69
70
71
72
73
74
75
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 68

def initialize_exchange
  DefaultLogger.debug("Amqp::Abstract#initialize_exchange")
  # declare a exchange on the channel
  @exchange = MQ::Exchange.new(@channel, options[:type], options[:exchange], :durable => options[:durable], :passive => options[:passive])
rescue MQ::Error => e # rescued here because main thread does not seem to see it
  DefaultLogger.error "#{e.class.name}: #{e.message}\n" << e.backtrace.join("\n")
  raise e
end

#initialize_queue(name = '') ⇒ Object

Defines, intializes and returns an Queue that store and forward messages.

Parameters:

  • name (String) (defaults to: '')

    The name to use for the queue



82
83
84
85
86
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 82

def initialize_queue(name = '')
  DefaultLogger.debug("Amqp::Abstract#initialize_queue(name = '')")
  name = options[:queue] if name.nil? || name.empty?
  @queue = MQ::Queue.new(@channel, name, :durable => options[:durable], :passive => options[:passive])
end

#stop_connectionObject

Gracefully shuts down the AMQP connection. Calls the given block if provided



49
50
51
52
53
# File 'lib/pipeline_toolkit/amqp/abstract.rb', line 49

def stop_connection
  # NB: Next tick seems to give it enough time to receieve and send outstanding acks. But I don't
  # really understand timing, so keep an eye on it.
  EM.next_tick { @connection.close { yield if block_given? } }
end