Class: ActiveMessaging::Adapters::Amqp::Connection

Inherits:
Object
  • Object
show all
Includes:
ActiveMessaging::Adapter
Defined in:
lib/activemessaging/adapters/amqp.rb

Defined Under Namespace

Classes: InvalidExchangeType

Constant Summary collapse

SERVER_RETRY_MAX_ATTEMPTS =
10
DEFAULT_QUEUE_CONFIG =
{
  :durable     => true,
  :auto_delete => false,
  :exclusive   => true
}

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(config = {}) ⇒ Connection

Returns a new instance of Connection.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/activemessaging/adapters/amqp.rb', line 39

def initialize config = {}
  @connect_options = {
    :user  => config[:user]  || 'guest',
    :pass  => config[:pass]  || 'guest',
    :host  => config[:host]  || 'localhost',
    :port  => config[:port]  || (config[:ssl] ? 5671 : 5672),
    :vhost => config[:vhost] || nil,
    :ssl   => config[:ssl]   || false,
    :ssl_verify => config[:ssl_verify] || OpenSSL::SSL::VERIFY_PEER,
  }
  
  @debug = config[:debug].to_i rescue 0
  
  Carrot.logging = true unless @debug < 5
  
  @auto_generated_queue = false
  unless config[:queue_name]
    @queue_name = Digest::MD5.hexdigest Time.now.to_s
    @auto_generated_queue = true
  else
    @queue_name = config[:queue_name]
  end

  @queue_config = DEFAULT_QUEUE_CONFIG
  unless @auto_generated_queue
    @queue_config.merge({
      :durable     => !!config[:queue_durable],
      :auto_delete => !!config[:queue_auto_delete],
      :exclusive   => !!config[:queue_exclusive]
    })
  end
end

Instance Method Details

#disconnect(headers = {}) ⇒ Object



140
141
142
# File 'lib/activemessaging/adapters/amqp.rb', line 140

def disconnect(headers={})
  @client.stop
end

#receive(options = {}) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/activemessaging/adapters/amqp.rb', line 82

def receive(options={})
  while true 
    message = queue.pop(:ack => true)
    unless message.nil?
      message = AmqpMessage.decode(message).stamp_received! unless message.nil?
      message.delivery_tag = queue.delivery_tag
      puts "RECEIVE: #{message.inspect}" if @debug 
      return message
    end
    sleep 0.2
  end
end

#received(message, headers = {}) ⇒ Object



72
73
74
75
# File 'lib/activemessaging/adapters/amqp.rb', line 72

def received message, headers = {}
  puts "Received Message - ACK'ing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0
  client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Ack.new(:delivery_tag => message.headers[:delivery_tag]))
end

#send(queue_name, data, headers = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/activemessaging/adapters/amqp.rb', line 95

def send queue_name, data, headers = {}
  headers[:routing_key] ||= queue_name
  message = AmqpMessage.new({:headers => headers, :data => data}, queue_name)
  
  if @debug > 0
    puts "Sending the following message: "; pp message
  end
  
  begin
    exchange(*exchange_info(headers)).publish(message.stamp_sent!.encode, :key => headers[:routing_key])
  rescue ::Carrot::AMQP::Server::ServerDown
    retry_attempts = retry_attempts.nil? ? 1 : retry_attempts + 1
    sleep(retry_attempts * 0.25)
    retry unless retry_attempts >= SERVER_RETRY_MAX_ATTEMPTS
    raise e
  end
end

#subscribe(queue_name, headers = {}, subId = nil) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/activemessaging/adapters/amqp.rb', line 113

def subscribe queue_name, headers = {}, subId = nil
  if @debug > 1
    puts "Begin Subscribe Request:"
    puts "    Queue Name: #{queue_name.inspect}"
    puts "       Headers: #{headers.inspect}"
    puts "         subId: #{subId.inspect}"
    puts "     EXCH INFO: #{exchange_info(headers).inspect}"
    puts "End Subscribe Request."
  end
  
  routing_key = headers[:routing_key] || queue_name
  queue.bind(exchange(*exchange_info(headers)), :key => routing_key)
end

#unreceive(message, headers = {}) ⇒ Object



77
78
79
80
# File 'lib/activemessaging/adapters/amqp.rb', line 77

def unreceive message, headers = {}
  puts "Un-Receiving Message - REJECTing with delivery_tag '#{message.headers[:delivery_tag]}'" if @debug > 0
  client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Reject.new(:delivery_tag => message.headers[:delivery_tag]))
end

#unsubscribe(queue_name, headers = {}, subId = nil) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/activemessaging/adapters/amqp.rb', line 127

def unsubscribe(queue_name, headers={}, subId=nil)
  if @debug > 1
    puts "Begin UNsubscribe Request:"
    puts "    Queue Name: #{queue_name.inspect}"
    puts "    Headers:    #{headers.inspect}"
    puts "    subId:      #{subId.inspect}"
    puts "End UNsubscribe Request."
  end
  
  routing_key = headers[:routing_key] || queue_name
  queue.unbind(exchange(*exchange_info(headers)), :key => routing_key)
end