Class: QueueingRabbit::Client::AMQP
- Inherits:
-
Object
- Object
- QueueingRabbit::Client::AMQP
show all
- Extended by:
- Callbacks, Logging
- Includes:
- Logging
- Defined in:
- lib/queueing_rabbit/client/amqp.rb
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
-
#begin_worker_loop ⇒ Object
-
#bind_queue(queue, exchange, options = {}) ⇒ Object
-
#close ⇒ Object
-
#define_exchange(channel, name = '', options = {}) ⇒ Object
-
#define_queue(channel, queue_name, options = {}) ⇒ Object
-
#enqueue(exchange, payload, options = {}) ⇒ Object
(also: #publish)
-
#listen_queue(queue, options = {}, &block) ⇒ Object
-
#next_tick(&block) ⇒ Object
-
#open? ⇒ Boolean
-
#open_channel(options = {}) ⇒ Object
-
#purge_queue(queue) ⇒ Object
-
#queue_size(queue) ⇒ Object
-
#wait_while_for(proc, period, _ = nil, &block) ⇒ Object
Methods included from Callbacks
callback, define_callback
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
13
14
15
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 13
def connection
@connection
end
|
Class Method Details
.connect ⇒ Object
40
41
42
43
44
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 40
def self.connect
self.ensure_event_machine_is_running
self.new(::AMQP.connect(QueueingRabbit.amqp_uri, connection_options))
end
|
.connection_options ⇒ Object
34
35
36
37
38
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 34
def self.connection_options
{:timeout => QueueingRabbit.tcp_timeout,
:heartbeat => QueueingRabbit.heartbeat,
:on_tcp_connection_failure => self.callback(:on_tcp_failure)}
end
|
.ensure_event_machine_is_running ⇒ Object
46
47
48
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 46
def self.ensure_event_machine_is_running
run_event_machine unless EM.reactor_running?
end
|
.join_event_machine_thread ⇒ Object
70
71
72
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 70
def self.join_event_machine_thread
@event_machine_thread.join if @event_machine_thread
end
|
.run_event_machine ⇒ Object
50
51
52
53
54
55
56
57
58
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 50
def self.run_event_machine
@event_machine_thread = Thread.new do
EM.run do
QueueingRabbit.trigger_event(:event_machine_started)
end
end
wait_for_event_machine_to_start
end
|
.wait_for_event_machine_to_start ⇒ Object
60
61
62
63
64
65
66
67
68
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 60
def self.wait_for_event_machine_to_start
Timeout.timeout(5) do
sleep 0.5 until EM.reactor_running?
end
rescue Timeout::Error => e
description = "wait timeout exceeded while starting up EventMachine"
fatal description
raise QueueingRabbitError.new(description)
end
|
Instance Method Details
#begin_worker_loop ⇒ Object
153
154
155
156
157
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 153
def begin_worker_loop
EM.run do
yield if block_given?
end
end
|
#bind_queue(queue, exchange, options = {}) ⇒ Object
94
95
96
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 94
def bind_queue(queue, exchange, options = {})
queue.bind(exchange, options)
end
|
#close ⇒ Object
78
79
80
81
82
83
84
85
86
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 78
def close
info "closing AMQP broker connection..."
connection.disconnect do
yield if block_given?
EM.stop if EM.reactor_running?
end
end
|
#define_exchange(channel, name = '', options = {}) ⇒ Object
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 112
def define_exchange(channel, name = '', options = {})
options = options.dup
type = options.delete(:type)
with_exchange = Proc.new do |exchange, _|
yield exchange if block_given?
end
if type && type != :default
channel.send(type.to_sym, name, options, &with_exchange)
else
channel.default_exchange.tap(&with_exchange)
end
end
|
#define_queue(channel, queue_name, options = {}) ⇒ Object
88
89
90
91
92
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 88
def define_queue(channel, queue_name, options={})
channel.queue(queue_name.to_s, options) do |queue|
yield queue if block_given?
end
end
|
#enqueue(exchange, payload, options = {}) ⇒ Object
Also known as:
publish
126
127
128
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 126
def enqueue(exchange, payload, options = {})
exchange.publish(payload, options)
end
|
#listen_queue(queue, options = {}, &block) ⇒ Object
98
99
100
101
102
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 98
def listen_queue(queue, options = {}, &block)
queue.subscribe(options) do |metadata, payload|
yield payload, metadata
end
end
|
#next_tick(&block) ⇒ Object
141
142
143
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 141
def next_tick(&block)
EM.next_tick(&block)
end
|
#open? ⇒ Boolean
74
75
76
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 74
def open?
EM.reactor_running? && @connection.open?
end
|
#open_channel(options = {}) ⇒ Object
104
105
106
107
108
109
110
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 104
def open_channel(options = {})
::AMQP::Channel.new(connection, nil, options) do |c, open_ok|
c.confirm_select if !!options[:use_publisher_confirms]
c.on_error(&self.class.callback(:on_channel_error))
yield c, open_ok
end
end
|
#purge_queue(queue) ⇒ Object
135
136
137
138
139
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 135
def purge_queue(queue)
queue.purge do
yield if block_given?
end
end
|
#queue_size(queue) ⇒ Object
131
132
133
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 131
def queue_size(queue)
raise NotImplementedError
end
|
#wait_while_for(proc, period, _ = nil, &block) ⇒ Object
145
146
147
148
149
150
151
|
# File 'lib/queueing_rabbit/client/amqp.rb', line 145
def wait_while_for(proc, period, _ = nil, &block)
if proc.call
EM.add_timer(period, &block)
else
block.call
end
end
|