Class: QueueingRabbit::Client::Bunny
- Inherits:
-
Object
- Object
- QueueingRabbit::Client::Bunny
- Includes:
- Logging
- Defined in:
- lib/queueing_rabbit/client/bunny.rb
Defined Under Namespace
Classes: Metadata
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Class Method Summary collapse
Instance Method Summary collapse
- #begin_worker_loop ⇒ Object
- #bind_queue(queue, exchange, options = {}) ⇒ Object
- #close ⇒ Object
- #define_exchange(channel = nil, name = '', options = {}) {|exchange| ... } ⇒ Object
- #define_queue(channel, name, options = {}) {|queue| ... } ⇒ Object
- #enqueue(exchange, payload, options = {}) ⇒ Object (also: #publish)
- #listen_queue(queue, options = {}) ⇒ Object
- #next_tick(&block) ⇒ Object
- #open? ⇒ Boolean
- #open_channel(options = {}) {|ch, nil| ... } ⇒ Object
- #purge_queue(queue) ⇒ Object
- #queue_size(queue) ⇒ Object
- #wait_while_for(proc, seconds, interval = 0.5) ⇒ Object
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
33 34 35 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 33 def connection @connection end |
Class Method Details
.connect ⇒ Object
40 41 42 43 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 40 def self.connect self.new(::Bunny.new(QueueingRabbit.amqp_uri, )) end |
.connection_options ⇒ Object
35 36 37 38 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 35 def self. {:connection_timeout => QueueingRabbit.tcp_timeout, :heartbeat => QueueingRabbit.heartbeat} end |
Instance Method Details
#begin_worker_loop ⇒ Object
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 113 def begin_worker_loop yield if block_given? @actions_queue = [] @continue_worker_loop = true # We may need to add signal handling here while @continue_worker_loop @actions_queue.take_while { |block| block.call || true } sleep 1 end end |
#bind_queue(queue, exchange, options = {}) ⇒ Object
59 60 61 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 59 def bind_queue(queue, exchange, = {}) queue.bind(exchange, ) end |
#close ⇒ Object
95 96 97 98 99 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 95 def close @connection.close yield if block_given? @continue_worker_loop = false end |
#define_exchange(channel = nil, name = '', options = {}) {|exchange| ... } ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 68 def define_exchange(channel = nil, name = '', = {}) = .dup type = .delete(:type) exchange = type ? channel.send(type.to_sym, name, ) : channel.default_exchange yield exchange if block_given? exchange end |
#define_queue(channel, name, options = {}) {|queue| ... } ⇒ Object
53 54 55 56 57 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 53 def define_queue(channel, name, = {}) queue = channel.queue(name.to_s, ) yield queue if block_given? queue end |
#enqueue(exchange, payload, options = {}) ⇒ Object Also known as: publish
63 64 65 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 63 def enqueue(exchange, payload, = {}) exchange.publish(payload, ) end |
#listen_queue(queue, options = {}) ⇒ Object
84 85 86 87 88 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 84 def listen_queue(queue, = {}) queue.subscribe() do |delivery_info, properties, payload| yield payload, Metadata.new(queue.channel, delivery_info, properties) end end |
#next_tick(&block) ⇒ Object
105 106 107 108 109 110 111 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 105 def next_tick(&block) if @continue_worker_loop @actions_queue << block else block.call end end |
#open? ⇒ Boolean
101 102 103 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 101 def open? @connection.open? end |
#open_channel(options = {}) {|ch, nil| ... } ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 45 def open_channel( = {}) ch = connection.create_channel ch.prefetch([:prefetch]) if [:prefetch] ch.confirm_select if [:use_publisher_confirms] yield ch, nil ch end |
#purge_queue(queue) ⇒ Object
90 91 92 93 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 90 def purge_queue(queue) queue.purge yield if block_given? end |
#queue_size(queue) ⇒ Object
80 81 82 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 80 def queue_size(queue) queue.status[:message_count] end |
#wait_while_for(proc, seconds, interval = 0.5) ⇒ Object
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/queueing_rabbit/client/bunny.rb', line 124 def wait_while_for(proc, seconds, interval = 0.5) end_time = Time.now.to_i + seconds while Time.now.to_i < end_time do return unless proc.call sleep interval end yield end |