Class: QueueingRabbit::Client::Bunny

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/queueing_rabbit/client/bunny.rb

Defined Under Namespace

Classes: Metadata

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



33
34
35
# File 'lib/queueing_rabbit/client/bunny.rb', line 33

def connection
  @connection
end

Class Method Details

.connectObject



40
41
42
43
# File 'lib/queueing_rabbit/client/bunny.rb', line 40

def self.connect
  self.new(::Bunny.new(QueueingRabbit.amqp_uri,
           connection_options))
end

.connection_optionsObject



35
36
37
38
# File 'lib/queueing_rabbit/client/bunny.rb', line 35

def self.connection_options
  {:connection_timeout => QueueingRabbit.tcp_timeout,
   :heartbeat => QueueingRabbit.heartbeat}
end

Instance Method Details

#begin_worker_loopObject



113
114
115
116
117
118
119
120
121
122
# File 'lib/queueing_rabbit/client/bunny.rb', line 113

def begin_worker_loop
  yield if block_given?
  @actions_queue = []
  @continue_worker_loop = true
  # We may need to add signal handling here
  while @continue_worker_loop
    @actions_queue.take_while { |block| block.call || true }
    sleep 1
  end
end

#bind_queue(queue, exchange, options = {}) ⇒ Object



59
60
61
# File 'lib/queueing_rabbit/client/bunny.rb', line 59

def bind_queue(queue, exchange, options = {})
  queue.bind(exchange, options)
end

#closeObject



95
96
97
98
99
# File 'lib/queueing_rabbit/client/bunny.rb', line 95

def close
  @connection.close
  yield if block_given?
  @continue_worker_loop = false
end

#define_exchange(channel = nil, name = '', options = {}) {|exchange| ... } ⇒ Object

Yields:

  • (exchange)


68
69
70
71
72
73
74
75
76
77
78
# File 'lib/queueing_rabbit/client/bunny.rb', line 68

def define_exchange(channel = nil, name = '', options = {})
  options = options.dup
  type = options.delete(:type)

  exchange = type ? channel.send(type.to_sym, name, options) :
                    channel.default_exchange

  yield exchange if block_given?

  exchange
end

#define_queue(channel, name, options = {}) {|queue| ... } ⇒ Object

Yields:

  • (queue)


53
54
55
56
57
# File 'lib/queueing_rabbit/client/bunny.rb', line 53

def define_queue(channel, name, options = {})
  queue = channel.queue(name.to_s, options)
  yield queue if block_given?
  queue
end

#enqueue(exchange, payload, options = {}) ⇒ Object Also known as: publish



63
64
65
# File 'lib/queueing_rabbit/client/bunny.rb', line 63

def enqueue(exchange, payload, options = {})
  exchange.publish(payload, options)
end

#listen_queue(queue, options = {}) ⇒ Object



84
85
86
87
88
# File 'lib/queueing_rabbit/client/bunny.rb', line 84

def listen_queue(queue, options = {})
  queue.subscribe(options) do |delivery_info, properties, payload|
    yield payload, Metadata.new(queue.channel, delivery_info, properties)
  end
end

#next_tick(&block) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/queueing_rabbit/client/bunny.rb', line 105

def next_tick(&block)
  if @continue_worker_loop
    @actions_queue << block
  else
    block.call
  end
end

#open?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/queueing_rabbit/client/bunny.rb', line 101

def open?
  @connection.open?
end

#open_channel(options = {}) {|ch, nil| ... } ⇒ Object

Yields:

  • (ch, nil)


45
46
47
48
49
50
51
# File 'lib/queueing_rabbit/client/bunny.rb', line 45

def open_channel(options = {})
  ch = connection.create_channel
  ch.prefetch(options[:prefetch]) if options[:prefetch]
  ch.confirm_select if options[:use_publisher_confirms]
  yield ch, nil
  ch
end

#purge_queue(queue) ⇒ Object



90
91
92
93
# File 'lib/queueing_rabbit/client/bunny.rb', line 90

def purge_queue(queue)
  queue.purge
  yield if block_given?
end

#queue_size(queue) ⇒ Object



80
81
82
# File 'lib/queueing_rabbit/client/bunny.rb', line 80

def queue_size(queue)
  queue.status[:message_count]
end

#wait_while_for(proc, seconds, interval = 0.5) ⇒ Object



124
125
126
127
128
129
130
131
132
133
# File 'lib/queueing_rabbit/client/bunny.rb', line 124

def wait_while_for(proc, seconds, interval = 0.5)
  end_time = Time.now.to_i + seconds

  while Time.now.to_i < end_time do
    return unless proc.call
    sleep interval
  end

  yield
end