Class: HotBunnies::Queue::Subscription
- Inherits:
-
Object
- Object
- HotBunnies::Queue::Subscription
- Defined in:
- lib/hot_bunnies/queue.rb
Instance Method Summary collapse
- #active? ⇒ Boolean
- #cancel ⇒ Object
- #each(options = {}, &block) ⇒ Object
-
#initialize(channel, queue_name, options = {}) ⇒ Subscription
constructor
A new instance of Subscription.
- #shutdown! ⇒ Object
Constructor Details
#initialize(channel, queue_name, options = {}) ⇒ Subscription
Returns a new instance of Subscription.
62 63 64 65 66 67 68 |
# File 'lib/hot_bunnies/queue.rb', line 62 def initialize(channel, queue_name, ={}) @channel = channel @queue_name = queue_name @ack = .fetch(:ack, false) @cancelled = java.util.concurrent.atomic.AtomicBoolean.new(false) end |
Instance Method Details
#active? ⇒ Boolean
99 100 101 |
# File 'lib/hot_bunnies/queue.rb', line 99 def active? !@cancelled.get && !@subscriber.nil? && !@subscriber.consumer_tag.nil? end |
#cancel ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/hot_bunnies/queue.rb', line 86 def cancel raise 'Can\'t cancel: the subscriber haven\'t received an OK yet' if !self.active? @channel.basic_cancel(@subscriber.consumer_tag) # RabbitMQ Java client won't clear consumer_tag from cancelled consumers, # so we have to do this. Sharing consumers # between threads in general is a can of worms but someone somewhere # will almost certainly do it, so. MK. @cancelled.set(true) maybe_shutdown_executor end |
#each(options = {}, &block) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/hot_bunnies/queue.rb', line 70 def each(={}, &block) raise 'The subscription already has a message listener' if @subscriber if .fetch(:blocking, true) run(&block) else if [:executor] @shut_down_executor = false @executor = [:executor] else @shut_down_executor = true @executor = java.util.concurrent.Executors.new_single_thread_executor end @executor.submit { run(&block) } end end |
#shutdown! ⇒ Object
103 104 105 106 107 |
# File 'lib/hot_bunnies/queue.rb', line 103 def shutdown! if @executor && @shut_down_executor @executor.shutdown_now end end |