Class: Workerholic::JobScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/workerholic/job_scheduler.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ JobScheduler

Returns a new instance of JobScheduler.



6
7
8
9
# File 'lib/workerholic/job_scheduler.rb', line 6

def initialize(opts={})
  @sorted_set = opts[:sorted_set] || SortedSet.new
  @alive = true
end

Instance Attribute Details

#aliveObject

Returns the value of attribute alive.



4
5
6
# File 'lib/workerholic/job_scheduler.rb', line 4

def alive
  @alive
end

#scheduler_threadObject (readonly)

Returns the value of attribute scheduler_thread.



3
4
5
# File 'lib/workerholic/job_scheduler.rb', line 3

def scheduler_thread
  @scheduler_thread
end

#sorted_setObject (readonly)

Returns the value of attribute sorted_set.



3
4
5
# File 'lib/workerholic/job_scheduler.rb', line 3

def sorted_set
  @sorted_set
end

Instance Method Details

#enqueue_due_jobsObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/workerholic/job_scheduler.rb', line 29

def enqueue_due_jobs
  if job_due?
    while job_due?
      serialized_job, job_execution_time = sorted_set.peek
      job = JobSerializer.deserialize(serialized_job)
      queue = job.queue ? Queue.new(job.queue) : Queue.new

      queue.enqueue(serialized_job)

      sorted_set.remove(job_execution_time)
    end
  else
    sleep(2)
  end
end

#job_due?Boolean

Returns:

  • (Boolean)


17
18
19
20
21
22
23
# File 'lib/workerholic/job_scheduler.rb', line 17

def job_due?
  scheduled_job = sorted_set.peek
  return false unless scheduled_job

  job_execution_time = scheduled_job.last
  Time.now.to_f >= job_execution_time
end

#joinObject



49
50
51
# File 'lib/workerholic/job_scheduler.rb', line 49

def join
  scheduler_thread.join if scheduler_thread
end

#killObject



45
46
47
# File 'lib/workerholic/job_scheduler.rb', line 45

def kill
  self.alive = false
end

#schedule(serialized_job, score) ⇒ Object



25
26
27
# File 'lib/workerholic/job_scheduler.rb', line 25

def schedule(serialized_job, score)
  sorted_set.add(serialized_job, score)
end

#startObject



11
12
13
14
15
# File 'lib/workerholic/job_scheduler.rb', line 11

def start
  @scheduler_thread = Thread.new do
    enqueue_due_jobs while alive
  end
end