Class: ActiveMessaging::Adapters::Amqp::Connection
- Inherits:
-
Object
- Object
- ActiveMessaging::Adapters::Amqp::Connection
show all
- Includes:
- ActiveMessaging::Adapter
- Defined in:
- lib/activemessaging/adapters/amqp.rb
Defined Under Namespace
Classes: InvalidExchangeType
Constant Summary
collapse
- SERVER_RETRY_MAX_ATTEMPTS =
10
- DEFAULT_QUEUE_CONFIG =
{
:durable => true,
:auto_delete => false,
:exclusive => true
}
Instance Method Summary
collapse
included, #logger
Constructor Details
#initialize(config = {}) ⇒ Connection
Returns a new instance of Connection.
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/activemessaging/adapters/amqp.rb', line 39
def initialize config = {}
@connect_options = {
:user => config[:user] || 'guest',
:pass => config[:pass] || 'guest',
:host => config[:host] || 'localhost',
:port => config[:port] || (config[:ssl] ? 5671 : 5672),
:vhost => config[:vhost] || nil,
:ssl => config[:ssl] || false,
:ssl_verify => config[:ssl_verify] || OpenSSL::SSL::VERIFY_PEER,
}
@debug = config[:debug].to_i rescue 0
Carrot.logging = true unless @debug < 5
@auto_generated_queue = false
unless config[:queue_name]
@queue_name = Digest::MD5.hexdigest Time.now.to_s
@auto_generated_queue = true
else
@queue_name = config[:queue_name]
end
@queue_config = DEFAULT_QUEUE_CONFIG
unless @auto_generated_queue
@queue_config.merge!({
:durable => !!config[:queue_durable],
:auto_delete => !!config[:queue_auto_delete],
:exclusive => !!config[:queue_exclusive]
})
end
end
|
Instance Method Details
#disconnect(headers = {}) ⇒ Object
140
141
142
|
# File 'lib/activemessaging/adapters/amqp.rb', line 140
def disconnect(={})
@client.stop
end
|
#receive(options = {}) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/activemessaging/adapters/amqp.rb', line 82
def receive(options={})
while true
message = queue.pop(:ack => true)
unless message.nil?
message = AmqpMessage.decode(message).stamp_received! unless message.nil?
message.delivery_tag = queue.delivery_tag
puts "RECEIVE: #{message.inspect}" if @debug
return message
end
sleep 0.2
end
end
|
#received(message, headers = {}) ⇒ Object
72
73
74
75
|
# File 'lib/activemessaging/adapters/amqp.rb', line 72
def received message, = {}
puts "Received Message - ACK'ing with delivery_tag '#{message.[:delivery_tag]}'" if @debug > 0
client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Ack.new(:delivery_tag => message.[:delivery_tag]))
end
|
#send(queue_name, data, headers = {}) ⇒ Object
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/activemessaging/adapters/amqp.rb', line 95
def send queue_name, data, = {}
[:routing_key] ||= queue_name
message = AmqpMessage.new({:headers => , :data => data}, queue_name)
if @debug > 0
puts "Sending the following message: "; pp message
end
begin
exchange(*exchange_info()).publish(message.stamp_sent!.encode, :key => [:routing_key])
rescue ::Carrot::AMQP::Server::ServerDown
retry_attempts = retry_attempts.nil? ? 1 : retry_attempts + 1
sleep(retry_attempts * 0.25)
retry unless retry_attempts >= SERVER_RETRY_MAX_ATTEMPTS
raise e
end
end
|
#subscribe(queue_name, headers = {}, subId = nil) ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/activemessaging/adapters/amqp.rb', line 113
def subscribe queue_name, = {}, subId = nil
if @debug > 1
puts "Begin Subscribe Request:"
puts " Queue Name: #{queue_name.inspect}"
puts " Headers: #{.inspect}"
puts " subId: #{subId.inspect}"
puts " EXCH INFO: #{exchange_info().inspect}"
puts "End Subscribe Request."
end
routing_key = [:routing_key] || queue_name
queue.bind(exchange(*exchange_info()), :key => routing_key)
end
|
#unreceive(message, headers = {}) ⇒ Object
77
78
79
80
|
# File 'lib/activemessaging/adapters/amqp.rb', line 77
def unreceive message, = {}
puts "Un-Receiving Message - REJECTing with delivery_tag '#{message.[:delivery_tag]}'" if @debug > 0
client.server.send_frame(::Carrot::AMQP::Protocol::Basic::Reject.new(:delivery_tag => message.[:delivery_tag]))
end
|
#unsubscribe(queue_name, headers = {}, subId = nil) ⇒ Object
127
128
129
130
131
132
133
134
135
136
137
138
|
# File 'lib/activemessaging/adapters/amqp.rb', line 127
def unsubscribe(queue_name, ={}, subId=nil)
if @debug > 1
puts "Begin UNsubscribe Request:"
puts " Queue Name: #{queue_name.inspect}"
puts " Headers: #{.inspect}"
puts " subId: #{subId.inspect}"
puts "End UNsubscribe Request."
end
routing_key = [:routing_key] || queue_name
queue.unbind(exchange(*exchange_info()), :key => routing_key)
end
|