Class: FreeMessageQueue::LoadBalancedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fmq/queues/load_balanced.rb

Overview

This queue is an approach to the issue that you want to have multiple threads at one queue at a time. Currently this is not considered to be a stable queue. Just use it for experimental things.

queue_manager = FreeMessageQueue::QueueManager.new(true) do
  setup_queue "/fmq_test/test1", FreeMessageQueue::LoadBalancedQueue do |q|
    q.forward_to = ["/fmq_test/test1", "/fmq_test/test2"]
  end
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(manager, queue_count = 5) ⇒ LoadBalancedQueue

Returns a new instance of LoadBalancedQueue.



35
36
37
38
39
40
41
42
43
# File 'lib/fmq/queues/load_balanced.rb', line 35

def initialize(manager, queue_count = 5)
  @manager = manager
  @queues = []
  queue_count.times do
    @queues << SyncronizedQueue.new(manager)
  end
  @poll_queue = @put_queue = 0
  @semaphore = Mutex.new
end

Instance Attribute Details

#managerObject

QueueManager refrence



33
34
35
# File 'lib/fmq/queues/load_balanced.rb', line 33

def manager
  @manager
end

Instance Method Details

#bytesObject

size of queue in bytes



58
59
60
61
62
# File 'lib/fmq/queues/load_balanced.rb', line 58

def bytes
  tmp_bytes = 0
  @queues.each { |q| tmp_bytes += q.bytes }
  return tmp_bytes
end

#clearObject

delete all messages in all queues



46
47
48
# File 'lib/fmq/queues/load_balanced.rb', line 46

def clear
  @queues.each { |q| q.clear }
end

#max_messagesObject

queue has infinite count



75
76
77
# File 'lib/fmq/queues/load_balanced.rb', line 75

def max_messages
  BaseQueue::INFINITE
end

#max_sizeObject

queue has infinite messages



80
81
82
# File 'lib/fmq/queues/load_balanced.rb', line 80

def max_size
  BaseQueue::INFINITE
end

#pollObject

Return one message from one of the queues



65
66
67
# File 'lib/fmq/queues/load_balanced.rb', line 65

def poll
  @queues[next_poll_index].poll
end

#put(message) ⇒ Object

Put an item to one of the queues



70
71
72
# File 'lib/fmq/queues/load_balanced.rb', line 70

def put(message)
  @queues[next_put_index].put(message)
end

#sizeObject

size of the queue is sum of size of all load balanced queues



51
52
53
54
55
# File 'lib/fmq/queues/load_balanced.rb', line 51

def size
  size = 0
  @queues.each { |q| size += q.size }
  return size 
end