Class: SerialScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/serial_scheduler.rb,
lib/serial_scheduler/version.rb

Defined Under Namespace

Classes: Producer

Constant Summary collapse

VERSION =
"0.2.4"

Instance Method Summary collapse

Constructor Details

#initialize(logger: Logger.new($stdout), error_handler: ->(e) { raise e }) ⇒ SerialScheduler

Returns a new instance of SerialScheduler.



46
47
48
49
50
51
52
# File 'lib/serial_scheduler.rb', line 46

def initialize(logger: Logger.new($stdout), error_handler: ->(e) { raise e })
  @logger = logger
  @error_handler = error_handler

  @producers = []
  @stopped = false
end

Instance Method Details

#add(*args, **kwargs, &block) ⇒ Object

start a new thread that enqueues an execution at given interval



55
56
57
# File 'lib/serial_scheduler.rb', line 55

def add(*args, **kwargs, &block)
  @producers << Producer.new(*args, **kwargs, &block)
end

#runObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/serial_scheduler.rb', line 59

def run
  now = Time.now.to_i
  @producers.each { |p| p.start now }

  loop do
    now = Time.now.to_i
    earliest = @producers.min_by(&:next)
    wait = [earliest.next - now, 0].max # do not wait when overdue
    target = Time.at(now + wait)

    if wait > 0
      @logger.info message: "Waiting to start job", job: earliest.name, in: wait, at: target.to_s
      loop do
        break if @stopped || Time.now >= target # need to re-check or long waits will drift by .3%

        sleep 1
      end
    end
    break if @stopped

    earliest.next!
    execute_in_fork earliest
  end
end

#stopObject



84
85
86
# File 'lib/serial_scheduler.rb', line 84

def stop
  @stopped = true
end