Class: Schdlr

Inherits:
Object
  • Object
show all
Defined in:
lib/schdlr.rb,
lib/schdlr/job.rb,
lib/schdlr/version.rb,
lib/schdlr/dispatcher.rb,
lib/schdlr/queue/sqlite.rb

Defined Under Namespace

Classes: Dispatcher, Error, Job, JobFailed, Queue

Constant Summary collapse

VERSION =
"0.1.1"

Instance Method Summary collapse

Constructor Details

#initialize(queue, **options) ⇒ Schdlr

Returns a new instance of Schdlr.



19
20
21
22
23
24
25
26
# File 'lib/schdlr.rb', line 19

def initialize(queue, **options)
  @queue = queue
  @dispatcher = Dispatcher.new
  execution_interval = options[:interval] || 1
  timeout_interval = options[:timeout] || 1
  @timer = Concurrent::TimerTask.new(execution_interval: execution_interval, timeout_interval: timeout_interval) { puts Time.now }
  @timer.add_observer(self)
end

Instance Method Details

#at(datetime, task) ⇒ Object



36
37
38
# File 'lib/schdlr.rb', line 36

def at(datetime, task)
  @queue.set(datetime, task)
end

#now(task) ⇒ Object



40
41
42
# File 'lib/schdlr.rb', line 40

def now(task)
  at(Time.now, task)
end

#start!Object



28
29
30
# File 'lib/schdlr.rb', line 28

def start!
  @timer.execute
end

#stop!Object



32
33
34
# File 'lib/schdlr.rb', line 32

def stop!
  @timer.shutdown
end

#task_update(time, result, ex) ⇒ Object



44
45
46
47
48
49
50
51
52
# File 'lib/schdlr.rb', line 44

def task_update(time, result, ex)
  if ex
    id, _, _, _ = *ex.task
    @queue.fail(id)
  else
    id, _, _, _ = *result
    @queue.del(id)
  end
end

#update(time, result, ex) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/schdlr.rb', line 54

def update(time, result, ex)
  tasks = @queue.tasks(time)
  tasks.collect do |task_spec|
    ivar = @dispatcher.async.dispatch(task_spec)
    ivar.add_observer(self, :task_update)
    ivar
  end
end