Class: RabbitJobs::Scheduler
- Inherits:
-
Object
- Object
- RabbitJobs::Scheduler
- Includes:
- MainLoop
- Defined in:
- lib/rabbit_jobs/scheduler.rb
Instance Attribute Summary collapse
-
#process_name ⇒ Object
Returns the value of attribute process_name.
-
#schedule ⇒ Object
Returns the value of attribute schedule.
Instance Method Summary collapse
-
#clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one.
- #load_default_schedule ⇒ Object
-
#load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance.
-
#publish_from_config(config) ⇒ Object
Publish a job based on a config hash.
-
#rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current ENV.
- #rufus_scheduler ⇒ Object
- #setup_job_schedule(name, config) ⇒ Object
- #startup ⇒ Object
-
#work(time = 0) ⇒ Object
Subscribes to channel and working on jobs.
Methods included from MainLoop
#log_daemon_error, #main_loop, #shutdown, #shutdown!
Instance Attribute Details
#process_name ⇒ Object
Returns the value of attribute process_name.
6 7 8 |
# File 'lib/rabbit_jobs/scheduler.rb', line 6 def process_name @process_name end |
#schedule ⇒ Object
Returns the value of attribute schedule.
6 7 8 |
# File 'lib/rabbit_jobs/scheduler.rb', line 6 def schedule @schedule end |
Instance Method Details
#clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
64 65 66 67 68 |
# File 'lib/rabbit_jobs/scheduler.rb', line 64 def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil rufus_scheduler end |
#load_default_schedule ⇒ Object
13 14 15 16 17 18 19 20 |
# File 'lib/rabbit_jobs/scheduler.rb', line 13 def load_default_schedule if defined?(Rails) file = Rails.root.join('config/schedule.yml') if file.file? @schedule = HashWithIndifferentAccess.new(YAML.load_file(file)) end end end |
#load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/rabbit_jobs/scheduler.rb', line 24 def load_schedule! @schedule ||= load_default_schedule raise "You should setup a schedule or place it in config/schedule.yml" unless schedule schedule.each do |name, config| # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as # required for the jobs to be scheduled. If rails_env is missing, the # job should be scheduled regardless of what ENV['RAILS_ENV'] is set # to. if config['rails_env'].nil? || rails_env_matches?(config) setup_job_schedule(name, config) end end end |
#publish_from_config(config) ⇒ Object
Publish a job based on a config hash
46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/rabbit_jobs/scheduler.rb', line 46 def publish_from_config(config) args = config[:args] || [] klass_name = config[:class] params = [args].flatten RabbitJobs.logger.info "publishing #{config} at #{Time.now}" RabbitJobs.publish_to(config[:queue], klass_name, *params) rescue RabbitJobs.logger.warn "Failed to publish #{klass_name}:\n #{$!}\n params = #{params.inspect}" RabbitJobs.logger.warn $!.inspect end |
#rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current ENV
41 42 43 |
# File 'lib/rabbit_jobs/scheduler.rb', line 41 def rails_env_matches?(config) config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV']) end |
#rufus_scheduler ⇒ Object
58 59 60 |
# File 'lib/rabbit_jobs/scheduler.rb', line 58 def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.start_new end |
#setup_job_schedule(name, config) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/rabbit_jobs/scheduler.rb', line 102 def setup_job_schedule(name, config) interval_defined = false %w(cron every).each do |interval_type| if config[interval_type].present? RabbitJobs.logger.info "queueing #{config['class']} (#{name})" rufus_scheduler.send(interval_type, config[interval_type], blocking: true) do publish_from_config(config) end interval_defined = true end end unless interval_defined RabbitJobs.logger.warn "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" end end |
#startup ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rabbit_jobs/scheduler.rb', line 89 def startup # Fix buffering so we can `rake rj:work > resque.log` and # get output from the child in there. $stdout.sync = true @shutdown = false Signal.trap('TERM') { shutdown } Signal.trap('INT') { shutdown! } true end |
#work(time = 0) ⇒ Object
Subscribes to channel and working on jobs
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/rabbit_jobs/scheduler.rb', line 71 def work(time = 0) begin return false unless startup $0 = self.process_name || "rj_scheduler" RabbitJobs.logger.info "Started." load_schedule! return main_loop(time) rescue log_daemon_error($!) end true end |