Class: HotBunnies::Queue::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/hot_bunnies/queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue_name, options = {}) ⇒ Subscription

Returns a new instance of Subscription.



62
63
64
65
66
67
68
# File 'lib/hot_bunnies/queue.rb', line 62

def initialize(channel, queue_name, options={})
  @channel    = channel
  @queue_name = queue_name
  @ack        = options.fetch(:ack, false)

  @cancelled  = java.util.concurrent.atomic.AtomicBoolean.new(false)
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/hot_bunnies/queue.rb', line 99

def active?
  !@cancelled.get && !@subscriber.nil? && !@subscriber.consumer_tag.nil?
end

#cancelObject



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/hot_bunnies/queue.rb', line 86

def cancel
  raise 'Can\'t cancel: the subscriber haven\'t received an OK yet' if !self.active?
  @channel.basic_cancel(@subscriber.consumer_tag)

  # RabbitMQ Java client won't clear consumer_tag from cancelled consumers,
  # so we have to do this. Sharing consumers
  # between threads in general is a can of worms but someone somewhere
  # will almost certainly do it, so. MK.
  @cancelled.set(true)

  maybe_shutdown_executor
end

#each(options = {}, &block) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/hot_bunnies/queue.rb', line 70

def each(options={}, &block)
  raise 'The subscription already has a message listener' if @subscriber
  if options.fetch(:blocking, true)
    run(&block)
  else
    if options[:executor]
      @shut_down_executor = false
      @executor = options[:executor]
    else
      @shut_down_executor = true
      @executor = java.util.concurrent.Executors.new_single_thread_executor
    end
    @executor.submit { run(&block) }
  end
end

#shutdown!Object



103
104
105
106
107
# File 'lib/hot_bunnies/queue.rb', line 103

def shutdown!
  if @executor && @shut_down_executor
    @executor.shutdown_now
  end
end