Class: RabbitMQClient::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/jessica/rabbitmq_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, channel, opts = {}) ⇒ Queue

Returns a new instance of Queue.



80
81
82
83
84
85
86
87
88
# File 'lib/jessica/rabbitmq_client.rb', line 80

def initialize(name, channel, opts={})
  @name = name
  @durable,@auto_delete,@args = opts.values_at(:durable,:auto_delete,:args)
  @channel = channel
  exclusive = false
  @bindings = {}
  @channel.queue_declare(name, @durable, exclusive, @auto_delete || false, @args)
  self
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



78
79
80
# File 'lib/jessica/rabbitmq_client.rb', line 78

def name
  @name
end

Instance Method Details

#bind(exchange, opts) ⇒ Object



96
97
98
99
100
101
# File 'lib/jessica/rabbitmq_client.rb', line 96

def bind(exchange, opts)
  key = opts[:routing_key]
  @bindings[key] = exchange.name
  @channel.queue_bind(@name, exchange.name, key)
  self
end

#loop_subscribe(&block) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/jessica/rabbitmq_client.rb', line 140

def loop_subscribe(&block)
  auto_ack = false
  consumer = QueueingConsumer.new(@channel)
  @channel.basic_consume(@name, auto_ack, consumer)
  loop do
    begin
      delivery = consumer.next_delivery
      message_body = String.from_java_bytes(delivery.get_body)
      case block.arity
      when 1
        block.call message_body
      when 2
        properties = delivery.get_properties
        block.call message_body, properties
      when 3
        properties = delivery.get_properties
        envelope = delivery.get_envelope
        block.call message_body, properties, envelope
      end
      @channel.basic_ack(delivery.get_envelope.get_delivery_tag, false)
    rescue InterruptedException => ie
      #next
    end
  end
end

#match_opts(opts) ⇒ Object



90
91
92
93
# File 'lib/jessica/rabbitmq_client.rb', line 90

def match_opts(opts)
  dur,aut,arg = opts.values_at(:durable,:auto_delete,:args)
  @durable == dur && @auto_delete == aut && @args == arg
end

#purgeObject



184
185
186
# File 'lib/jessica/rabbitmq_client.rb', line 184

def purge
  @channel.queue_purge(@name)
end

#reactive_loop_subscribe(opts = {}, &block) ⇒ Object

block take 1 arg, a ReactiveMessage



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/jessica/rabbitmq_client.rb', line 166

def reactive_loop_subscribe(opts={}, &block) #block take 1 arg, a ReactiveMessage
  auto_ack = opts[:auto_ack] || false
  consumer = QueueingConsumer.new(@channel)
  @channel.basic_consume(@name, auto_ack, consumer)
  loop do
    begin
      delivery = consumer.next_delivery
      message_body = String.from_java_bytes(delivery.get_body)
      remsg = ReactiveMessage.new(@channel, delivery, message_body)
      block.call remsg
      @channel.basic_ack(delivery.envelope.delivery_tag, false) if remsg.should_acknowledge?
    rescue InterruptedException => ie
      #next
    end
    # next (above) is only necessary if you don't want any code here executed
  end
end

#retrieve(opts = {}) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/jessica/rabbitmq_client.rb', line 114

def retrieve(opts={})
  message_body = nil
  no_ack = opts[:no_ack] ? opts[:no_ack] : false
  response = @channel.basic_get(@name, no_ack)
  if response
    props = response.props
    message_body = String.from_java_bytes(response.body)
    delivery_tag = response.envelope.delivery_tag
    @channel.basic_ack(delivery_tag, false)
  end
  unless opts.empty?
    resp = {}
    resp[:message_body] = message_body
    resp[:properties] = props unless opts[:properties].nil?
    resp[:envelope] = response.envelope unless opts[:envelope].nil?
    resp
  else
    message_body
  end
end

#subscribe(&block) ⇒ Object



135
136
137
138
# File 'lib/jessica/rabbitmq_client.rb', line 135

def subscribe(&block)
  auto_ack = false
  @channel.basic_consume(@name, auto_ack, QueueConsumer.new(@channel, block))
end

#unbind(exchange = nil, opts = {}) ⇒ Object



103
104
105
106
107
108
109
110
111
112
# File 'lib/jessica/rabbitmq_client.rb', line 103

def unbind(exchange=nil, opts={})
  if exchange && opts.size > 0
    key = opts[:routing_key]
    @channel.queue_unbind(@name, exchange.name, key)
  else
    @bindings.each do |key, exch|
      @channel.queue_unbind(@name, exch, key)
    end
  end
end