Class: SuperSpreader::Spreader
- Inherits:
-
Object
- Object
- SuperSpreader::Spreader
- Defined in:
- lib/super_spreader/spreader.rb
Instance Method Summary collapse
- #enqueue_spread(**opts) ⇒ Object
-
#initialize(job_class, model_class, spread_tracker: nil) ⇒ Spreader
constructor
A new instance of Spreader.
- #spread(batch_size:, duration:, per_second:, initial_id:, begin_at: Time.now.utc) ⇒ Object
Constructor Details
#initialize(job_class, model_class, spread_tracker: nil) ⇒ Spreader
Returns a new instance of Spreader.
7 8 9 10 11 |
# File 'lib/super_spreader/spreader.rb', line 7 def initialize(job_class, model_class, spread_tracker: nil) @job_class = job_class @model_class = model_class @spread_tracker = spread_tracker || SpreadTracker.new(job_class, model_class) end |
Instance Method Details
#enqueue_spread(**opts) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/super_spreader/spreader.rb', line 36 def enqueue_spread(**opts) initial_id = @spread_tracker.initial_id return 0 if initial_id.zero? batches = spread(**opts.merge(initial_id: initial_id)) batches.each do |batch| @job_class .set(wait_until: batch[:run_at]) .perform_later(batch[:begin_id], batch[:end_id]) end last_begin_id = batches.last[:begin_id] next_id = last_begin_id - 1 @spread_tracker.initial_id = next_id next_id end |
#spread(batch_size:, duration:, per_second:, initial_id:, begin_at: Time.now.utc) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/super_spreader/spreader.rb', line 13 def spread(batch_size:, duration:, per_second:, initial_id:, begin_at: Time.now.utc) end_id = initial_id segment_duration = 1.0 / per_second time_index = 0.0 batches = [] while time_index < duration break if end_id <= 0 # Use floor to prevent subsecond times run_at = begin_at + time_index.floor begin_id = clamp(end_id - batch_size + 1) batches << {run_at: run_at, begin_id: begin_id, end_id: end_id} break if begin_id == 1 end_id = begin_id - 1 time_index += segment_duration end batches end |