Class: RabbitMQClient::Queue
- Inherits:
-
Object
- Object
- RabbitMQClient::Queue
- Defined in:
- lib/jessica/rabbitmq_client.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #bind(exchange, opts) ⇒ Object
-
#initialize(name, channel, opts = {}) ⇒ Queue
constructor
A new instance of Queue.
- #loop_subscribe(&block) ⇒ Object
- #match_opts(opts) ⇒ Object
- #purge ⇒ Object
-
#reactive_loop_subscribe(opts = {}, &block) ⇒ Object
block take 1 arg, a ReactiveMessage.
- #retrieve(opts = {}) ⇒ Object
- #subscribe(&block) ⇒ Object
- #unbind(exchange = nil, opts = {}) ⇒ Object
Constructor Details
#initialize(name, channel, opts = {}) ⇒ Queue
Returns a new instance of Queue.
80 81 82 83 84 85 86 87 88 |
# File 'lib/jessica/rabbitmq_client.rb', line 80 def initialize(name, channel, opts={}) @name = name @durable,@auto_delete,@args = opts.values_at(:durable,:auto_delete,:args) @channel = channel exclusive = false @bindings = {} @channel.queue_declare(name, @durable, exclusive, @auto_delete || false, @args) self end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
78 79 80 |
# File 'lib/jessica/rabbitmq_client.rb', line 78 def name @name end |
Instance Method Details
#bind(exchange, opts) ⇒ Object
96 97 98 99 100 101 |
# File 'lib/jessica/rabbitmq_client.rb', line 96 def bind(exchange, opts) key = opts[:routing_key] @bindings[key] = exchange.name @channel.queue_bind(@name, exchange.name, key) self end |
#loop_subscribe(&block) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/jessica/rabbitmq_client.rb', line 140 def loop_subscribe(&block) auto_ack = false consumer = QueueingConsumer.new(@channel) @channel.basic_consume(@name, auto_ack, consumer) loop do begin delivery = consumer.next_delivery = String.from_java_bytes(delivery.get_body) case block.arity when 1 block.call when 2 properties = delivery.get_properties block.call , properties when 3 properties = delivery.get_properties envelope = delivery.get_envelope block.call , properties, envelope end @channel.basic_ack(delivery.get_envelope.get_delivery_tag, false) rescue InterruptedException => ie #next end end end |
#match_opts(opts) ⇒ Object
90 91 92 93 |
# File 'lib/jessica/rabbitmq_client.rb', line 90 def match_opts(opts) dur,aut,arg = opts.values_at(:durable,:auto_delete,:args) @durable == dur && @auto_delete == aut && @args == arg end |
#purge ⇒ Object
184 185 186 |
# File 'lib/jessica/rabbitmq_client.rb', line 184 def purge @channel.queue_purge(@name) end |
#reactive_loop_subscribe(opts = {}, &block) ⇒ Object
block take 1 arg, a ReactiveMessage
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/jessica/rabbitmq_client.rb', line 166 def reactive_loop_subscribe(opts={}, &block) #block take 1 arg, a ReactiveMessage auto_ack = opts[:auto_ack] || false consumer = QueueingConsumer.new(@channel) @channel.basic_consume(@name, auto_ack, consumer) loop do begin delivery = consumer.next_delivery = String.from_java_bytes(delivery.get_body) remsg = ReactiveMessage.new(@channel, delivery, ) block.call remsg @channel.basic_ack(delivery.envelope.delivery_tag, false) if remsg.should_acknowledge? rescue InterruptedException => ie #next end # next (above) is only necessary if you don't want any code here executed end end |
#retrieve(opts = {}) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/jessica/rabbitmq_client.rb', line 114 def retrieve(opts={}) = nil no_ack = opts[:no_ack] ? opts[:no_ack] : false response = @channel.basic_get(@name, no_ack) if response props = response.props = String.from_java_bytes(response.body) delivery_tag = response.envelope.delivery_tag @channel.basic_ack(delivery_tag, false) end unless opts.empty? resp = {} resp[:message_body] = resp[:properties] = props unless opts[:properties].nil? resp[:envelope] = response.envelope unless opts[:envelope].nil? resp else end end |
#subscribe(&block) ⇒ Object
135 136 137 138 |
# File 'lib/jessica/rabbitmq_client.rb', line 135 def subscribe(&block) auto_ack = false @channel.basic_consume(@name, auto_ack, QueueConsumer.new(@channel, block)) end |
#unbind(exchange = nil, opts = {}) ⇒ Object
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/jessica/rabbitmq_client.rb', line 103 def unbind(exchange=nil, opts={}) if exchange && opts.size > 0 key = opts[:routing_key] @channel.queue_unbind(@name, exchange.name, key) else @bindings.each do |key, exch| @channel.queue_unbind(@name, exch, key) end end end |