Class: Cuboid::Support::Database::CategorizedQueue
- Defined in:
- lib/cuboid/support/database/categorized_queue.rb
Overview
Constant Summary collapse
- DEFAULT_MAX_BUFFER_SIZE =
Default #max_buffer_size.
100
Constants inherited from Base
Instance Attribute Summary collapse
-
#max_buffer_size ⇒ Integer
How many entries to keep in memory before starting to off-load to disk.
-
#prefer ⇒ Object
Returns the value of attribute prefer.
Instance Method Summary collapse
- #<<(obj) ⇒ Object (also: #push, #enq)
- #buffer_size ⇒ Object
- #categories ⇒ Object
-
#clear ⇒ Object
Removes all objects from the queue.
- #data_for(category) ⇒ Object
- #disk_size ⇒ Object
-
#empty? ⇒ Bool
‘true` if the queue if empty, `false` otherwise.
- #free_buffer_size ⇒ Object
-
#initialize(options = {}, &block) ⇒ CategorizedQueue
constructor
A new instance of CategorizedQueue.
- #insert_to_disk(category, path) ⇒ Object
- #num_waiting ⇒ Object
-
#pop(non_block = false) ⇒ Object
(also: #deq, #shift)
Removes an object from the queue and returns it.
-
#size ⇒ Integer
(also: #length)
Size of the queue, the number of objects it currently holds.
Methods inherited from Base
decrement_disk_space, disk_directory, disk_space, disk_space_file, disk_space_file_for, disk_space_for, increment_disk_space, reset, #serialize, #unserialize
Constructor Details
#initialize(options = {}, &block) ⇒ CategorizedQueue
Returns a new instance of CategorizedQueue.
17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 17 def initialize( = {}, &block ) super( ) @prefer = block @max_buffer_size = [:max_buffer_size] || DEFAULT_MAX_BUFFER_SIZE @categories ||= {} @waiting = [] @mutex = Mutex.new @buffer_size = 0 @disk_size = 0 end |
Instance Attribute Details
#max_buffer_size ⇒ Integer
Defaults to DEFAULT_MAX_BUFFER_SIZE.
Returns How many entries to keep in memory before starting to off-load to disk.
12 13 14 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 12 def max_buffer_size @max_buffer_size end |
#prefer ⇒ Object
Returns the value of attribute prefer.
14 15 16 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 14 def prefer @prefer end |
Instance Method Details
#<<(obj) ⇒ Object Also known as: push, enq
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 58 def <<( obj ) fail ArgumentError, 'Missing #prefer block.' if !@prefer if !obj.respond_to?( :category ) fail ArgumentError, "#{obj.class} does not respond to #category." end synchronize do data = data_for( obj.category ) if data[:buffer].size < max_buffer_size @buffer_size += 1 data[:buffer] << obj else @disk_size += 1 data[:disk] << dump( obj ) end begin t = @waiting.shift t.wakeup if t rescue ThreadError retry end end end |
#buffer_size ⇒ Object
144 145 146 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 144 def buffer_size @buffer_size end |
#categories ⇒ Object
39 40 41 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 39 def categories @categories.keys end |
#clear ⇒ Object
Removes all objects from the queue.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 161 def clear synchronize do @categories.values.each do |data| data[:buffer].clear while !data[:disk].empty? path = data[:disk].pop next if !path delete_file path end end @buffer_size = 0 @disk_size = 0 end end |
#data_for(category) ⇒ Object
43 44 45 46 47 48 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 43 def data_for( category ) @categories[category.to_s] ||= { disk: [], buffer: [] } end |
#disk_size ⇒ Object
148 149 150 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 148 def disk_size @disk_size end |
#empty? ⇒ Bool
Returns ‘true` if the queue if empty, `false` otherwise.
154 155 156 157 158 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 154 def empty? synchronize do internal_empty? end end |
#free_buffer_size ⇒ Object
140 141 142 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 140 def free_buffer_size max_buffer_size - buffer_size end |
#insert_to_disk(category, path) ⇒ Object
50 51 52 53 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 50 def insert_to_disk( category, path ) data_for( category )[:disk] << path @disk_size += 1 end |
#num_waiting ⇒ Object
178 179 180 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 178 def num_waiting @waiting.size end |
#pop(non_block = false) ⇒ Object Also known as: deq, shift
Returns Removes an object from the queue and returns it.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 89 def pop( non_block = false ) fail ArgumentError, 'Missing #prefer block.' if !@prefer synchronize do loop do if internal_empty? raise ThreadError, 'queue empty' if non_block @waiting.push Thread.current @mutex.sleep else # Get preferred category, hopefully there'll be some data # for it. category = @prefer.call( @categories.keys ) # Get all other available categories just in case the # preferred one is empty. categories = @categories.keys categories.delete category data = nil # Check if our category has data and pick another if not. loop do data = data_for( category ) if data[:buffer].any? || data[:disk].any? break end category = categories.pop end if data[:buffer].any? @buffer_size -= 1 return data[:buffer].shift end @disk_size -= 1 return load_and_delete_file( data[:disk].shift ) end end end end |
#size ⇒ Integer Also known as: length
Returns Size of the queue, the number of objects it currently holds.
135 136 137 |
# File 'lib/cuboid/support/database/categorized_queue.rb', line 135 def size buffer_size + disk_size end |