Class: FreeMessageQueue::LoadBalancedQueue
- Inherits:
-
Object
- Object
- FreeMessageQueue::LoadBalancedQueue
- Defined in:
- lib/fmq/queues/load_balanced.rb
Overview
This queue is an approach to the issue that you want to have multiple threads at one queue at a time. Currently this is not considered to be a stable queue. Just use it for experimental things.
queue_manager = FreeMessageQueue::QueueManager.new(true) do
setup_queue "/fmq_test/test1", FreeMessageQueue::LoadBalancedQueue do |q|
q.forward_to = ["/fmq_test/test1", "/fmq_test/test2"]
end
end
Instance Attribute Summary collapse
-
#manager ⇒ Object
QueueManager refrence.
Instance Method Summary collapse
-
#bytes ⇒ Object
size of queue in bytes.
-
#clear ⇒ Object
delete all messages in all queues.
-
#initialize(manager, queue_count = 5) ⇒ LoadBalancedQueue
constructor
A new instance of LoadBalancedQueue.
-
#max_messages ⇒ Object
queue has infinite count.
-
#max_size ⇒ Object
queue has infinite messages.
-
#poll ⇒ Object
Return one message from one of the queues.
-
#put(message) ⇒ Object
Put an item to one of the queues.
-
#size ⇒ Object
size of the queue is sum of size of all load balanced queues.
Constructor Details
#initialize(manager, queue_count = 5) ⇒ LoadBalancedQueue
Returns a new instance of LoadBalancedQueue.
35 36 37 38 39 40 41 42 43 |
# File 'lib/fmq/queues/load_balanced.rb', line 35 def initialize(manager, queue_count = 5) @manager = manager @queues = [] queue_count.times do @queues << SyncronizedQueue.new(manager) end @poll_queue = @put_queue = 0 @semaphore = Mutex.new end |
Instance Attribute Details
#manager ⇒ Object
QueueManager refrence
33 34 35 |
# File 'lib/fmq/queues/load_balanced.rb', line 33 def manager @manager end |
Instance Method Details
#bytes ⇒ Object
size of queue in bytes
58 59 60 61 62 |
# File 'lib/fmq/queues/load_balanced.rb', line 58 def bytes tmp_bytes = 0 @queues.each { |q| tmp_bytes += q.bytes } return tmp_bytes end |
#clear ⇒ Object
delete all messages in all queues
46 47 48 |
# File 'lib/fmq/queues/load_balanced.rb', line 46 def clear @queues.each { |q| q.clear } end |
#max_messages ⇒ Object
queue has infinite count
75 76 77 |
# File 'lib/fmq/queues/load_balanced.rb', line 75 def BaseQueue::INFINITE end |
#max_size ⇒ Object
queue has infinite messages
80 81 82 |
# File 'lib/fmq/queues/load_balanced.rb', line 80 def max_size BaseQueue::INFINITE end |
#poll ⇒ Object
Return one message from one of the queues
65 66 67 |
# File 'lib/fmq/queues/load_balanced.rb', line 65 def poll @queues[next_poll_index].poll end |
#put(message) ⇒ Object
Put an item to one of the queues
70 71 72 |
# File 'lib/fmq/queues/load_balanced.rb', line 70 def put() @queues[next_put_index].put() end |
#size ⇒ Object
size of the queue is sum of size of all load balanced queues
51 52 53 54 55 |
# File 'lib/fmq/queues/load_balanced.rb', line 51 def size size = 0 @queues.each { |q| size += q.size } return size end |