Class: Gandalf::Scheduler
- Inherits:
-
Object
- Object
- Gandalf::Scheduler
- Includes:
- DataMapper::Resource
- Defined in:
- lib/gandalf/scheduler.rb
Overview
A magical scheduler
Instance Attribute Summary collapse
-
#redis ⇒ Object
Returns the value of attribute redis.
Instance Method Summary collapse
- #current_workload ⇒ Object
- #execute ⇒ Object
-
#job_distribution(workload, jobs) ⇒ Object
Recursively calculates next job distribution TODO Find a formula.
- #jobs_per_interval ⇒ Object
- #new_jobs ⇒ Object
- #push_jobs(jobs, workload) ⇒ Object
-
#run ⇒ Object
Executes jobs using a scheduler.
-
#setup(options = {}) ⇒ Object
Sets workers’ queues with Redis connection object.
Instance Attribute Details
#redis ⇒ Object
Returns the value of attribute redis.
17 18 19 |
# File 'lib/gandalf/scheduler.rb', line 17 def redis @redis end |
Instance Method Details
#current_workload ⇒ Object
72 73 74 75 76 |
# File 'lib/gandalf/scheduler.rb', line 72 def current_workload workload = {} workers.each { |worker| workload[worker.id] = worker.jobs_to_do } workload end |
#execute ⇒ Object
49 50 51 52 53 54 55 56 |
# File 'lib/gandalf/scheduler.rb', line 49 def execute jobs = new_jobs self.last_job_id = jobs.last.id save new_loads = job_distribution(current_workload, jobs.count) push_jobs(jobs, new_loads) end |
#job_distribution(workload, jobs) ⇒ Object
Recursively calculates next job distribution TODO Find a formula
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/gandalf/scheduler.rb', line 87 def job_distribution(workload, jobs) workload = workload.clone distribution = Hash.new(0) jobs.times do min_index = workload.min_by{|k, v| v}.first workload[min_index] += 1 distribution[min_index] += 1 end distribution end |
#jobs_per_interval ⇒ Object
58 59 60 |
# File 'lib/gandalf/scheduler.rb', line 58 def jobs_per_interval seed_count / (1440 * 60 / interval) end |
#new_jobs ⇒ Object
62 63 64 65 66 67 68 69 70 |
# File 'lib/gandalf/scheduler.rb', line 62 def new_jobs jobs = @Seed.all(:id.gte => last_job_id, :include_update => true, :limit => jobs_per_interval) if jobs.length < jobs_per_interval jobs += @Seed.all(:limit => jobs_per_interval - jobs.length) end jobs end |
#push_jobs(jobs, workload) ⇒ Object
78 79 80 81 82 83 |
# File 'lib/gandalf/scheduler.rb', line 78 def push_jobs(jobs, workload) workload.each do |worker_id, wload| worker = workers.get(worker_id) worker.push(jobs.slice!(0,wload)) end end |
#run ⇒ Object
Executes jobs using a scheduler
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/gandalf/scheduler.rb', line 31 def run scheduler = Rufus::Scheduler.start_new scheduler.every interval do execute end scheduler.every 10*interval do # TODO Use dm-aggregates when the bug gets fixed. self.seed_count = repository.adapter.query("SELECT COUNT(*) FROM #{seed_table} WHERE include_update = 1").first.to_i save end def scheduler.handle_exception(job, exception) puts exception raise exception end end |
#setup(options = {}) ⇒ Object
Sets workers’ queues with Redis connection object.
20 21 22 23 24 25 26 27 28 |
# File 'lib/gandalf/scheduler.rb', line 20 def setup( = {}) @redis = Redis.new(:host => self.redis_host, :db => self.redis_db_id) if [:seed_class] @Seed = [:seed_class] else @Seed = Seed end workers.each { |worker| worker.setup(:redis => @redis) } end |