Class: HotBunnies::Queue
- Inherits:
-
Object
- Object
- HotBunnies::Queue
- Includes:
- Hollywood
- Defined in:
- lib/euston-rabbitmq/hot_bunnies/queue.rb
Instance Attribute Summary collapse
-
#timeout ⇒ Object
writeonly
Sets the attribute timeout.
Instance Method Summary collapse
- #consumer(auto_ack = false) ⇒ Object
- #delivery_timeout ⇒ Object
- #safe_get ⇒ Object
- #safe_handle_message(reactive_message) ⇒ Object
- #safe_subscribe ⇒ Object
- #safe_subscribe_with_timeout(consumer, timeout = 500) ⇒ Object
Instance Attribute Details
#timeout=(value) ⇒ Object (writeonly)
Sets the attribute timeout
5 6 7 |
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 5 def timeout=(value) @timeout = value end |
Instance Method Details
#consumer(auto_ack = false) ⇒ Object
7 8 9 10 11 |
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 7 def consumer auto_ack = false consumer = QueueingConsumer.new @channel @channel.basic_consume @name, auto_ack, consumer consumer end |
#delivery_timeout ⇒ Object
13 14 15 |
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 13 def delivery_timeout @timeout ||= 500 end |
#safe_get ⇒ Object
17 18 19 20 |
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 17 def safe_get = get :ack => true ReactiveMessage.new(@channel, *) unless .nil? end |
#safe_handle_message(reactive_message) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 22 def begin = parse_json .body begin callback :message_received, .ack rescue Euston::EventStore::ConcurrencyError .reject #requeue rescue => e callback :message_failed, , e, end rescue => e callback :message_decode_failed, .body, e .ack Safely.report! e end .headers.ack unless .reacted? end |
#safe_subscribe ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 43 def safe_subscribe _consumer = self.consumer until Thread.current[:stop] do safe_subscribe_with_timeout _consumer, self.delivery_timeout end end |
#safe_subscribe_with_timeout(consumer, timeout = 500) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/euston-rabbitmq/hot_bunnies/queue.rb', line 51 def safe_subscribe_with_timeout consumer, timeout = 500 loop do delivery = nil begin delivery = consumer.next_delivery timeout rescue NativeException => e Thread.current[:exception] = e break end break if delivery.nil? headers = Headers.new @channel, nil, delivery.envelope, delivery.properties body = String.from_java_bytes(delivery.get_body) = ReactiveMessage.new @channel, headers, body end end |