Class: Baton::Channel

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/baton/channel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=

Constructor Details

#initializeChannel

Public: Initialize a Channel. It creates an AMQP connection, a channel, an input and an output exchange and finally attaches the handle_channel_exception callback to the on_error event on the channel.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/baton/channel.rb', line 13

def initialize

  @connection_options = Baton.configuration.connection_opts
  @amqp_hosts = Baton.configuration.amqp_host_list

  logger.info "Connecting to AMQP host: #{@connection_options[:host]}"

  @connection   = AMQP.connect(@connection_options)
  @channel      = AMQP::Channel.new(@connection)
  @channel.auto_recovery = true

  # Not everything needs an input exchange, default to the "" exchange if there isn't
  # one defined in the config (monitors for example)
  Baton.configuration.exchange = '' if Baton.configuration.exchange.nil?

  # Create the exchanges
  @exchange_in  = channel.direct(Baton.configuration.exchange)
  @exchange_out = channel.direct(Baton.configuration.exchange_out)

  # Attach callbacks for error handling
  @connection.on_tcp_connection_loss(&method(:handle_tcp_failure))
  @connection.on_skipped_heartbeats(&method(:handle_tcp_failure))
  @channel.on_error(&method(:handle_channel_exception))
end

Instance Attribute Details

#amqp_hostsObject

Returns the value of attribute amqp_hosts.



8
9
10
# File 'lib/baton/channel.rb', line 8

def amqp_hosts
  @amqp_hosts
end

#channelObject

Returns the value of attribute channel.



8
9
10
# File 'lib/baton/channel.rb', line 8

def channel
  @channel
end

#connectionObject

Returns the value of attribute connection.



8
9
10
# File 'lib/baton/channel.rb', line 8

def connection
  @connection
end

#connection_optionsObject

Returns the value of attribute connection_options.



8
9
10
# File 'lib/baton/channel.rb', line 8

def connection_options
  @connection_options
end

#exchange_inObject

Returns the value of attribute exchange_in.



8
9
10
# File 'lib/baton/channel.rb', line 8

def exchange_in
  @exchange_in
end

#exchange_outObject

Returns the value of attribute exchange_out.



8
9
10
# File 'lib/baton/channel.rb', line 8

def exchange_out
  @exchange_out
end

Instance Method Details

#add_consumer(consumer) ⇒ Object

Public: creates a consumer manager with a consumer attached and starts listening to messages.

consumer - An instance of Baton::Consumer. it will typically be a extension of Baton::Consumer (e.g. Baton::DeployConsumer).

Examples

add_consumer(consumer)

Returns nothing.



50
51
52
# File 'lib/baton/channel.rb', line 50

def add_consumer(consumer)
  Baton::ConsumerManager.new(consumer, channel, exchange_in, exchange_out).start
end

#get_new_amqp_host(amqp_hosts) ⇒ Object

Public: Provides a new AMQP hostname

amqp_hosts - An array of hostnames for your AMQP servers

Returns a string of an AMQP hostname.



71
72
73
# File 'lib/baton/channel.rb', line 71

def get_new_amqp_host(amqp_hosts)
  amqp_hosts[Kernel.rand(amqp_hosts.size)]
end

#handle_channel_exception(channel, channel_close) ⇒ Object

Public: Callback to handle errors on an AMQP channel.

channel - An AMQP channel channel_close -

Returns nothing.



61
62
63
# File 'lib/baton/channel.rb', line 61

def handle_channel_exception(channel, channel_close)
  logger.error "Channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
end

#handle_tcp_failure(connection, settings) ⇒ Object

Public: Callback to handle TCP connection loss

connection - An AMQP Connection settings - Current AMQP settings (see amq-client/lib/amq/client/settings.rb and lib/amq/client/async/adapter.rb)

Returns nothing.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/baton/channel.rb', line 82

def handle_tcp_failure(connection, settings)

  logger.info("Connection to AMQP lost. Finding new host..")

  if @amqp_hosts.size == 1
    logger.info("Only a single host.. reconnecting")
    connection.reconnect(false, 10)
    return
  end

  current_host = settings[:host]
  new_host = get_new_amqp_host(@amqp_hosts)

  while new_host == current_host
    new_host = get_new_amqp_host(@amqp_hosts)
  end

  settings[:host] = new_host

  logger.info ("Reconnecting to AMPQ host: #{new_host}")

  connection.reconnect_to(settings)
end