Class: Oxidized::Jobs
- Inherits:
-
Array
- Object
- Array
- Oxidized::Jobs
- Defined in:
- lib/oxidized/jobs.rb
Constant Summary collapse
- AVERAGE_DURATION =
initially presume nodes take 5s to complete
5
- MAX_INTER_JOB_GAP =
add job if more than X from last job started
300
Instance Attribute Summary collapse
-
#interval ⇒ Object
Returns the value of attribute interval.
-
#max ⇒ Object
Returns the value of attribute max.
-
#want ⇒ Object
Returns the value of attribute want.
Instance Method Summary collapse
- #duration(last) ⇒ Object
- #increment ⇒ Object
-
#initialize(max, use_max_threads, interval, nodes) ⇒ Jobs
constructor
A new instance of Jobs.
- #new_count ⇒ Object
- #push(arg) ⇒ Object
- #work ⇒ Object
Constructor Details
#initialize(max, use_max_threads, interval, nodes) ⇒ Jobs
Returns a new instance of Jobs.
7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/oxidized/jobs.rb', line 7 def initialize(max, use_max_threads, interval, nodes) @max = max @use_max_threads = use_max_threads # Set interval to 1 if interval is 0 (=disabled) so we don't break # the 'ceil' function @interval = interval.zero? ? 1 : interval @nodes = nodes @last = Time.now.utc @durations = Array.new @nodes.size, AVERAGE_DURATION duration AVERAGE_DURATION super() end |
Instance Attribute Details
#interval ⇒ Object
Returns the value of attribute interval.
5 6 7 |
# File 'lib/oxidized/jobs.rb', line 5 def interval @interval end |
#max ⇒ Object
Returns the value of attribute max.
5 6 7 |
# File 'lib/oxidized/jobs.rb', line 5 def max @max end |
#want ⇒ Object
Returns the value of attribute want.
5 6 7 |
# File 'lib/oxidized/jobs.rb', line 5 def want @want end |
Instance Method Details
#duration(last) ⇒ Object
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/oxidized/jobs.rb', line 25 def duration(last) if @durations.size > @nodes.size @durations.slice! @nodes.size...@durations.size elsif @durations.size < @nodes.size @durations.fill AVERAGE_DURATION, @durations.size...@nodes.size end @durations.push(last).shift @duration = @durations.inject(:+).to_f / @nodes.size # rolling average new_count end |
#increment ⇒ Object
47 48 49 50 51 52 53 |
# File 'lib/oxidized/jobs.rb', line 47 def increment # Increments the job count if safe to do so, which means: # a) less threads running than the total amount of nodes # b) we want less than the max specified number of threads @want = [(@want + 1), @nodes.size, @max].min end |
#new_count ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/oxidized/jobs.rb', line 36 def new_count @want = if @use_max_threads @max else ((@nodes.size * @duration) / @interval).ceil end @want = 1 if @want < 1 @want = @nodes.size if @want > @nodes.size @want = @max if @want > @max end |
#push(arg) ⇒ Object
20 21 22 23 |
# File 'lib/oxidized/jobs.rb', line 20 def push(arg) @last = Time.now.utc super end |
#work ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/oxidized/jobs.rb', line 55 def work # if a) we want less or same amount of threads as we now running # and b) we want less threads running than the total amount of nodes # and c) there is more than MAX_INTER_JOB_GAP since last one was started # then we want one more thread (rationale is to fix hanging thread causing HOLB) return unless @want <= size && @want < @nodes.size return unless @want <= size increment if (Time.now.utc - @last) > MAX_INTER_JOB_GAP end |