Class: SuperSpreader::Spreader

Inherits:
Object
  • Object
show all
Defined in:
lib/super_spreader/spreader.rb

Instance Method Summary collapse

Constructor Details

#initialize(job_class, model_class, spread_tracker: nil) ⇒ Spreader

Returns a new instance of Spreader.



7
8
9
10
11
# File 'lib/super_spreader/spreader.rb', line 7

def initialize(job_class, model_class, spread_tracker: nil)
  @job_class = job_class
  @model_class = model_class
  @spread_tracker = spread_tracker || SpreadTracker.new(job_class, model_class)
end

Instance Method Details

#enqueue_spread(**opts) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/super_spreader/spreader.rb', line 36

def enqueue_spread(**opts)
  initial_id = @spread_tracker.initial_id
  return 0 if initial_id.zero?

  batches = spread(**opts.merge(initial_id: initial_id))

  batches.each do |batch|
    @job_class
      .set(wait_until: batch[:run_at])
      .perform_later(batch[:begin_id], batch[:end_id])
  end

  last_begin_id = batches.last[:begin_id]
  next_id = last_begin_id - 1
  @spread_tracker.initial_id = next_id

  next_id
end

#spread(batch_size:, duration:, per_second:, initial_id:, begin_at: Time.now.utc) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/super_spreader/spreader.rb', line 13

def spread(batch_size:, duration:, per_second:, initial_id:, begin_at: Time.now.utc)
  end_id = initial_id
  segment_duration = 1.0 / per_second
  time_index = 0.0
  batches = []

  while time_index < duration
    break if end_id <= 0

    # Use floor to prevent subsecond times
    run_at = begin_at + time_index.floor
    begin_id = clamp(end_id - batch_size + 1)
    batches << {run_at: run_at, begin_id: begin_id, end_id: end_id}

    break if begin_id == 1

    end_id = begin_id - 1
    time_index += segment_duration
  end

  batches
end