Class: Cuboid::Support::Database::Queue

Inherits:
Base show all
Defined in:
lib/cuboid/support/database/queue.rb

Overview

Flat-file Scheduler implementation

Behaves pretty much like a Ruby Scheduler however it transparently serializes and saves its entries to the file-system under the OS’s temp directory after a specified #max_buffer_size (for in-memory entries) has been exceeded.

It’s pretty useful when you want to reduce memory footprint without having to refactor any code since it behaves just like a Ruby Scheduler implementation.

Author:

Constant Summary collapse

DEFAULT_MAX_BUFFER_SIZE =

Default #max_buffer_size.

100

Constants inherited from Base

Base::DISK_SPACE_FILE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

decrement_disk_space, disk_directory, disk_space, disk_space_file, disk_space_file_for, disk_space_for, increment_disk_space, reset, #serialize, #unserialize

Constructor Details

#initialize(options = {}) ⇒ Queue

Returns a new instance of Queue.

See Also:

  • Database::Base#initialize


33
34
35
36
37
38
39
40
41
42
# File 'lib/cuboid/support/database/queue.rb', line 33

def initialize( options = {} )
    super( options )

    @max_buffer_size = options[:max_buffer_size] || DEFAULT_MAX_BUFFER_SIZE

    @disk    = []
    @buffer  = []
    @waiting = []
    @mutex   = Mutex.new
end

Instance Attribute Details

#bufferArray<Object> (readonly)

Returns Objects stored in the memory buffer.

Returns:

  • (Array<Object>)

    Objects stored in the memory buffer.



26
27
28
# File 'lib/cuboid/support/database/queue.rb', line 26

def buffer
  @buffer
end

#diskArray<String> (readonly)

Returns Paths to files stored to disk.

Returns:

  • (Array<String>)

    Paths to files stored to disk.



30
31
32
# File 'lib/cuboid/support/database/queue.rb', line 30

def disk
  @disk
end

#max_buffer_sizeInteger

Note:

Returns How many entries to keep in memory before starting to off-load to disk.

Returns:

  • (Integer)

    How many entries to keep in memory before starting to off-load to disk.



22
23
24
# File 'lib/cuboid/support/database/queue.rb', line 22

def max_buffer_size
  @max_buffer_size
end

Instance Method Details

#<<(obj) ⇒ Object Also known as: push, enq

Parameters:

  • obj (Object)

    Object to add to the queue.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/cuboid/support/database/queue.rb', line 54

def <<( obj )
    synchronize do
        if @buffer.size < max_buffer_size
            @buffer << obj
        else
            @disk << dump( obj )
        end

        begin
            t = @waiting.shift
            t.wakeup if t
        rescue ThreadError
            retry
        end
    end
end

#buffer_sizeObject



103
104
105
# File 'lib/cuboid/support/database/queue.rb', line 103

def buffer_size
    @buffer.size
end

#clearObject

Removes all objects from the queue.



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/cuboid/support/database/queue.rb', line 120

def clear
    synchronize do
        @buffer.clear

        while !@disk.empty?
            path = @disk.pop
            next if !path
            delete_file path
        end
    end
end

#disk_sizeObject



107
108
109
# File 'lib/cuboid/support/database/queue.rb', line 107

def disk_size
    @disk.size
end

#empty?Bool

Returns ‘true` if the queue if empty, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the queue if empty, `false` otherwise.



113
114
115
116
117
# File 'lib/cuboid/support/database/queue.rb', line 113

def empty?
    synchronize do
        internal_empty?
    end
end

#free_buffer_sizeObject



99
100
101
# File 'lib/cuboid/support/database/queue.rb', line 99

def free_buffer_size
    max_buffer_size - buffer_size
end

#num_waitingObject



132
133
134
# File 'lib/cuboid/support/database/queue.rb', line 132

def num_waiting
    @waiting.size
end

#pop(non_block = false) ⇒ Object Also known as: deq, shift

Returns Removes an object from the queue and returns it.

Returns:

  • (Object)

    Removes an object from the queue and returns it.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/cuboid/support/database/queue.rb', line 75

def pop( non_block = false )
    synchronize do
        loop do
            if internal_empty?
                raise ThreadError, 'queue empty' if non_block
                @waiting.push Thread.current
                @mutex.sleep
            else
                return @buffer.shift if !@buffer.empty?
                return load_and_delete_file( @disk.shift )
            end
        end
    end
end

#sizeInteger Also known as: length

Returns Size of the queue, the number of objects it currently holds.

Returns:

  • (Integer)

    Size of the queue, the number of objects it currently holds.



94
95
96
# File 'lib/cuboid/support/database/queue.rb', line 94

def size
    buffer_size + disk_size
end