Class: Cuboid::Support::Database::Queue
- Defined in:
- lib/cuboid/support/database/queue.rb
Overview
Flat-file Scheduler implementation
Behaves pretty much like a Ruby Scheduler however it transparently serializes and saves its entries to the file-system under the OS’s temp directory after a specified #max_buffer_size (for in-memory entries) has been exceeded.
It’s pretty useful when you want to reduce memory footprint without having to refactor any code since it behaves just like a Ruby Scheduler implementation.
Constant Summary collapse
- DEFAULT_MAX_BUFFER_SIZE =
Default #max_buffer_size.
100
Constants inherited from Base
Instance Attribute Summary collapse
-
#buffer ⇒ Array<Object>
readonly
Objects stored in the memory buffer.
-
#disk ⇒ Array<String>
readonly
Paths to files stored to disk.
-
#max_buffer_size ⇒ Integer
How many entries to keep in memory before starting to off-load to disk.
Instance Method Summary collapse
- #<<(obj) ⇒ Object (also: #push, #enq)
- #buffer_size ⇒ Object
-
#clear ⇒ Object
Removes all objects from the queue.
- #disk_size ⇒ Object
-
#empty? ⇒ Bool
‘true` if the queue if empty, `false` otherwise.
- #free_buffer_size ⇒ Object
-
#initialize(options = {}) ⇒ Queue
constructor
A new instance of Queue.
- #num_waiting ⇒ Object
-
#pop(non_block = false) ⇒ Object
(also: #deq, #shift)
Removes an object from the queue and returns it.
-
#size ⇒ Integer
(also: #length)
Size of the queue, the number of objects it currently holds.
Methods inherited from Base
decrement_disk_space, disk_directory, disk_space, disk_space_file, disk_space_file_for, disk_space_for, increment_disk_space, reset, #serialize, #unserialize
Constructor Details
#initialize(options = {}) ⇒ Queue
Returns a new instance of Queue.
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/cuboid/support/database/queue.rb', line 33 def initialize( = {} ) super( ) @max_buffer_size = [:max_buffer_size] || DEFAULT_MAX_BUFFER_SIZE @disk = [] @buffer = [] @waiting = [] @mutex = Mutex.new end |
Instance Attribute Details
#buffer ⇒ Array<Object> (readonly)
Returns Objects stored in the memory buffer.
26 27 28 |
# File 'lib/cuboid/support/database/queue.rb', line 26 def buffer @buffer end |
#disk ⇒ Array<String> (readonly)
Returns Paths to files stored to disk.
30 31 32 |
# File 'lib/cuboid/support/database/queue.rb', line 30 def disk @disk end |
#max_buffer_size ⇒ Integer
Defaults to DEFAULT_MAX_BUFFER_SIZE.
Returns How many entries to keep in memory before starting to off-load to disk.
22 23 24 |
# File 'lib/cuboid/support/database/queue.rb', line 22 def max_buffer_size @max_buffer_size end |
Instance Method Details
#<<(obj) ⇒ Object Also known as: push, enq
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/cuboid/support/database/queue.rb', line 54 def <<( obj ) synchronize do if @buffer.size < max_buffer_size @buffer << obj else @disk << dump( obj ) end begin t = @waiting.shift t.wakeup if t rescue ThreadError retry end end end |
#buffer_size ⇒ Object
103 104 105 |
# File 'lib/cuboid/support/database/queue.rb', line 103 def buffer_size @buffer.size end |
#clear ⇒ Object
Removes all objects from the queue.
120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/cuboid/support/database/queue.rb', line 120 def clear synchronize do @buffer.clear while !@disk.empty? path = @disk.pop next if !path delete_file path end end end |
#disk_size ⇒ Object
107 108 109 |
# File 'lib/cuboid/support/database/queue.rb', line 107 def disk_size @disk.size end |
#empty? ⇒ Bool
Returns ‘true` if the queue if empty, `false` otherwise.
113 114 115 116 117 |
# File 'lib/cuboid/support/database/queue.rb', line 113 def empty? synchronize do internal_empty? end end |
#free_buffer_size ⇒ Object
99 100 101 |
# File 'lib/cuboid/support/database/queue.rb', line 99 def free_buffer_size max_buffer_size - buffer_size end |
#num_waiting ⇒ Object
132 133 134 |
# File 'lib/cuboid/support/database/queue.rb', line 132 def num_waiting @waiting.size end |
#pop(non_block = false) ⇒ Object Also known as: deq, shift
Returns Removes an object from the queue and returns it.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/cuboid/support/database/queue.rb', line 75 def pop( non_block = false ) synchronize do loop do if internal_empty? raise ThreadError, 'queue empty' if non_block @waiting.push Thread.current @mutex.sleep else return @buffer.shift if !@buffer.empty? return load_and_delete_file( @disk.shift ) end end end end |
#size ⇒ Integer Also known as: length
Returns Size of the queue, the number of objects it currently holds.
94 95 96 |
# File 'lib/cuboid/support/database/queue.rb', line 94 def size buffer_size + disk_size end |