Class: Schdlr
- Inherits:
-
Object
show all
- Defined in:
- lib/schdlr.rb,
lib/schdlr/job.rb,
lib/schdlr/version.rb,
lib/schdlr/dispatcher.rb,
lib/schdlr/queue/sqlite.rb
Defined Under Namespace
Classes: Dispatcher, Error, Job, JobFailed, Queue
Constant Summary
collapse
- VERSION =
"0.1.1"
Instance Method Summary
collapse
Constructor Details
#initialize(queue, **options) ⇒ Schdlr
Returns a new instance of Schdlr.
19
20
21
22
23
24
25
26
|
# File 'lib/schdlr.rb', line 19
def initialize(queue, **options)
@queue = queue
@dispatcher = Dispatcher.new
execution_interval = options[:interval] || 1
timeout_interval = options[:timeout] || 1
@timer = Concurrent::TimerTask.new(execution_interval: execution_interval, timeout_interval: timeout_interval) { puts Time.now }
@timer.add_observer(self)
end
|
Instance Method Details
#at(datetime, task) ⇒ Object
36
37
38
|
# File 'lib/schdlr.rb', line 36
def at(datetime, task)
@queue.set(datetime, task)
end
|
#now(task) ⇒ Object
40
41
42
|
# File 'lib/schdlr.rb', line 40
def now(task)
at(Time.now, task)
end
|
#start! ⇒ Object
28
29
30
|
# File 'lib/schdlr.rb', line 28
def start!
@timer.execute
end
|
#stop! ⇒ Object
32
33
34
|
# File 'lib/schdlr.rb', line 32
def stop!
@timer.shutdown
end
|
#task_update(time, result, ex) ⇒ Object
44
45
46
47
48
49
50
51
52
|
# File 'lib/schdlr.rb', line 44
def task_update(time, result, ex)
if ex
id, _, _, _ = *ex.task
@queue.fail(id)
else
id, _, _, _ = *result
@queue.del(id)
end
end
|
#update(time, result, ex) ⇒ Object
54
55
56
57
58
59
60
61
|
# File 'lib/schdlr.rb', line 54
def update(time, result, ex)
tasks = @queue.tasks(time)
tasks.collect do |task_spec|
ivar = @dispatcher.async.dispatch(task_spec)
ivar.add_observer(self, :task_update)
ivar
end
end
|