Class: Cuboid::Support::Database::CategorizedQueue

Inherits:
Base
  • Object
show all
Defined in:
lib/cuboid/support/database/categorized_queue.rb

Overview

Author:

Constant Summary collapse

DEFAULT_MAX_BUFFER_SIZE =

Default #max_buffer_size.

100

Constants inherited from Base

Base::DISK_SPACE_FILE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

decrement_disk_space, disk_directory, disk_space, disk_space_file, disk_space_file_for, disk_space_for, increment_disk_space, reset, #serialize, #unserialize

Constructor Details

#initialize(options = {}, &block) ⇒ CategorizedQueue

Returns a new instance of CategorizedQueue.

See Also:

  • Database::Base#initialize


17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/cuboid/support/database/categorized_queue.rb', line 17

def initialize( options = {}, &block )
    super( options )

    @prefer = block
    @max_buffer_size = options[:max_buffer_size] || DEFAULT_MAX_BUFFER_SIZE

    @categories ||= {}
    @waiting = []
    @mutex   = Mutex.new

    @buffer_size = 0
    @disk_size   = 0
end

Instance Attribute Details

#max_buffer_sizeInteger

Note:

Returns How many entries to keep in memory before starting to off-load to disk.

Returns:

  • (Integer)

    How many entries to keep in memory before starting to off-load to disk.



12
13
14
# File 'lib/cuboid/support/database/categorized_queue.rb', line 12

def max_buffer_size
  @max_buffer_size
end

#preferObject

Returns the value of attribute prefer.



14
15
16
# File 'lib/cuboid/support/database/categorized_queue.rb', line 14

def prefer
  @prefer
end

Instance Method Details

#<<(obj) ⇒ Object Also known as: push, enq

Parameters:

  • obj (Object)

    Object to add to the queue. Must respond to #category.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/cuboid/support/database/categorized_queue.rb', line 58

def <<( obj )
    fail ArgumentError, 'Missing #prefer block.' if !@prefer

    if !obj.respond_to?( :category )
        fail ArgumentError, "#{obj.class} does not respond to #category."
    end

    synchronize do
        data = data_for( obj.category )

        if data[:buffer].size < max_buffer_size
            @buffer_size += 1
            data[:buffer] << obj
        else
            @disk_size += 1
            data[:disk] << dump( obj )
        end

        begin
            t = @waiting.shift
            t.wakeup if t
        rescue ThreadError
            retry
        end
    end
end

#buffer_sizeObject



144
145
146
# File 'lib/cuboid/support/database/categorized_queue.rb', line 144

def buffer_size
    @buffer_size
end

#categoriesObject



39
40
41
# File 'lib/cuboid/support/database/categorized_queue.rb', line 39

def categories
    @categories.keys
end

#clearObject

Removes all objects from the queue.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/cuboid/support/database/categorized_queue.rb', line 161

def clear
    synchronize do
        @categories.values.each do |data|
            data[:buffer].clear

            while !data[:disk].empty?
                path = data[:disk].pop
                next if !path
                delete_file path
            end
        end

        @buffer_size = 0
        @disk_size   = 0
    end
end

#data_for(category) ⇒ Object



43
44
45
46
47
48
# File 'lib/cuboid/support/database/categorized_queue.rb', line 43

def data_for( category )
    @categories[category.to_s] ||= {
        disk:   [],
        buffer: []
    }
end

#disk_sizeObject



148
149
150
# File 'lib/cuboid/support/database/categorized_queue.rb', line 148

def disk_size
    @disk_size
end

#empty?Bool

Returns ‘true` if the queue if empty, `false` otherwise.

Returns:

  • (Bool)

    ‘true` if the queue if empty, `false` otherwise.



154
155
156
157
158
# File 'lib/cuboid/support/database/categorized_queue.rb', line 154

def empty?
    synchronize do
        internal_empty?
    end
end

#free_buffer_sizeObject



140
141
142
# File 'lib/cuboid/support/database/categorized_queue.rb', line 140

def free_buffer_size
    max_buffer_size - buffer_size
end

#insert_to_disk(category, path) ⇒ Object



50
51
52
53
# File 'lib/cuboid/support/database/categorized_queue.rb', line 50

def insert_to_disk( category, path )
    data_for( category )[:disk] << path
    @disk_size += 1
end

#num_waitingObject



178
179
180
# File 'lib/cuboid/support/database/categorized_queue.rb', line 178

def num_waiting
    @waiting.size
end

#pop(non_block = false) ⇒ Object Also known as: deq, shift

Returns Removes an object from the queue and returns it.

Returns:

  • (Object)

    Removes an object from the queue and returns it.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/cuboid/support/database/categorized_queue.rb', line 89

def pop( non_block = false )
    fail ArgumentError, 'Missing #prefer block.' if !@prefer

    synchronize do
        loop do
            if internal_empty?
                raise ThreadError, 'queue empty' if non_block
                @waiting.push Thread.current
                @mutex.sleep
            else
                # Get preferred category, hopefully there'll be some data
                # for it.
                category = @prefer.call( @categories.keys )

                # Get all other available categories just in case the
                # preferred one is empty.
                categories = @categories.keys
                categories.delete category

                data = nil
                # Check if our category has data and pick another if not.
                loop do
                    data = data_for( category )
                    if data[:buffer].any? || data[:disk].any?
                        break
                    end

                    category = categories.pop
                end

                if data[:buffer].any?
                    @buffer_size -= 1
                    return data[:buffer].shift
                end

                @disk_size -= 1
                return load_and_delete_file( data[:disk].shift )
            end
        end
    end
end

#sizeInteger Also known as: length

Returns Size of the queue, the number of objects it currently holds.

Returns:

  • (Integer)

    Size of the queue, the number of objects it currently holds.



135
136
137
# File 'lib/cuboid/support/database/categorized_queue.rb', line 135

def size
    buffer_size + disk_size
end