Class: Que::JobBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/que/job_buffer.rb

Defined Under Namespace

Classes: PriorityQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(maximum_size:, priorities:) ⇒ JobBuffer

Since we use a mutex, which is not reentrant, we have to be a little careful to not call a method that locks the mutex when we’ve already locked it. So, as a general rule, public methods handle locking the mutex when necessary, while private methods handle the actual underlying data changes. This lets us reuse those private methods without running into locking issues.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/que/job_buffer.rb', line 18

def initialize(
  maximum_size:,
  priorities:
)
  @maximum_size = Que.assert(Integer, maximum_size)
  Que.assert(maximum_size >= 0) { "maximum_size for a JobBuffer must be at least zero!" }

  @stop  = false
  @array = []
  @mutex = Mutex.new

  @priority_queues = Hash[
    # Make sure that priority = nil sorts highest.
    priorities.sort_by{|p| p || MAXIMUM_PRIORITY}.map do |p|
      [p, PriorityQueue.new(priority: p, job_buffer: self)]
    end
  ].freeze
end

Instance Attribute Details

#maximum_sizeObject (readonly)

Returns the value of attribute maximum_size.



9
10
11
# File 'lib/que/job_buffer.rb', line 9

def maximum_size
  @maximum_size
end

#priority_queuesObject (readonly)

Returns the value of attribute priority_queues.



9
10
11
# File 'lib/que/job_buffer.rb', line 9

def priority_queues
  @priority_queues
end

Instance Method Details

#accept?(metajobs) ⇒ Boolean

Returns:

  • (Boolean)


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/que/job_buffer.rb', line 74

def accept?(metajobs)
  metajobs.sort!

  sync do
    return [] if _stopping?

    start_index = _buffer_space
    final_index = metajobs.length - 1

    return metajobs if start_index > final_index
    index_to_lose = @array.length - 1

    start_index.upto(final_index) do |index|
      if index_to_lose >= 0 && (metajobs[index] <=> @array[index_to_lose]) < 0
        return metajobs if index == final_index
        index_to_lose -= 1
      else
        return metajobs.slice(0...index)
      end
    end

    []
  end
end

#available_prioritiesObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/que/job_buffer.rb', line 107

def available_priorities
  hash = {}
  lowest_priority = true

  priority_queues.reverse_each do |priority, pq|
    count = pq.waiting_count

    if lowest_priority
      count += buffer_space
      lowest_priority = false
    end

    hash[priority || MAXIMUM_PRIORITY] = count if count > 0
  end

  hash
end

#buffer_spaceObject



125
126
127
# File 'lib/que/job_buffer.rb', line 125

def buffer_space
  sync { _buffer_space }
end

#clearObject



142
143
144
# File 'lib/que/job_buffer.rb', line 142

def clear
  sync { pop(_size) }
end

#job_available?(priority) ⇒ Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/que/job_buffer.rb', line 150

def job_available?(priority)
  (job = @array.first) && job.priority_sufficient?(priority)
end

#push(*metajobs) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/que/job_buffer.rb', line 37

def push(*metajobs)
  Que.internal_log(:job_buffer_push, self) do
    {
      maximum_size:  maximum_size,
      ids:           metajobs.map(&:id),
      current_queue: to_a,
    }
  end

  sync do
    return metajobs if _stopping?

    @array.concat(metajobs).sort!

    # Relying on the hash's contents being sorted, here.
    priority_queues.reverse_each do |_, pq|
      pq.populate do
        _shift_job(pq.priority)
      end
    end

    # If we passed the maximum buffer size, drop the lowest sort keys and
    # return their ids to be unlocked.
    overage = -_buffer_space
    pop(overage) if overage > 0
  end
end

#shift(priority = nil) ⇒ Object



65
66
67
68
# File 'lib/que/job_buffer.rb', line 65

def shift(priority = nil)
  queue = priority_queues.fetch(priority) { raise Error, "not a permitted priority! #{priority}" }
  queue.pop || shift_job(priority)
end

#shift_job(priority = nil) ⇒ Object



70
71
72
# File 'lib/que/job_buffer.rb', line 70

def shift_job(priority = nil)
  sync { _shift_job(priority) }
end

#sizeObject



129
130
131
# File 'lib/que/job_buffer.rb', line 129

def size
  sync { _size }
end

#stopObject



137
138
139
140
# File 'lib/que/job_buffer.rb', line 137

def stop
  sync { @stop = true }
  priority_queues.each_value(&:stop)
end

#stopping?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/que/job_buffer.rb', line 146

def stopping?
  sync { _stopping? }
end

#to_aObject



133
134
135
# File 'lib/que/job_buffer.rb', line 133

def to_a
  sync { @array.dup }
end

#waiting_countObject



99
100
101
102
103
104
105
# File 'lib/que/job_buffer.rb', line 99

def waiting_count
  count = 0
  priority_queues.each_value do |pq|
    count += pq.waiting_count
  end
  count
end