Class: Resque::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Defined in:
lib/resque/scheduler.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.muteObject

If set, produces no output



16
17
18
# File 'lib/resque/scheduler.rb', line 16

def mute
  @mute
end

.verboseObject

If true, logs more stuff…



13
14
15
# File 'lib/resque/scheduler.rb', line 13

def verbose
  @verbose
end

Class Method Details

.clear_schedule!Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler



140
141
142
143
144
# File 'lib/resque/scheduler.rb', line 140

def clear_schedule!
  rufus_scheduler.stop
  @rufus_scheduler = nil
  rufus_scheduler
end

.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object

Enqueues all delayed jobs for a timestamp



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/resque/scheduler.rb', line 93

def enqueue_delayed_items_for_timestamp(timestamp)
  item = nil
  begin
    handle_shutdown do
      if item = Resque.next_item_for_timestamp(timestamp)
        log "queuing #{item['class']} [delayed]"
        queue = item['queue'] || Resque.queue_from_class(constantize(item['class']))
        # Support custom job classes like job with status
        if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job')
          # custom job classes not supporting the same API calls must implement the #schedule method
          constantize(job_klass).scheduled(queue, item['class'], *item['args'])
        else
          Resque::Job.create(queue, item['class'], *item['args'])
        end
      end
    end
  # continue processing until there are no more ready items in this timestamp
  end while !item.nil?
end

.enqueue_from_config(config) ⇒ Object

Enqueues a job based on a config hash



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/resque/scheduler.rb', line 120

def enqueue_from_config(config)
  args = config['args'] || config[:args]
  klass_name = config['class'] || config[:class]
  params = args.nil? ? [] : Array(args)
  queue = config['queue'] || config[:queue] || Resque.queue_from_class(constantize(klass_name))
  # Support custom job classes like job with status
  if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job')
    # custom job classes not supporting the same API calls must implement the #schedule method
    constantize(job_klass).scheduled(queue, klass_name, *params)
  else
    Resque::Job.create(queue, klass_name, *params)
  end        
end

.handle_delayed_itemsObject

Handles queueing delayed items



82
83
84
85
86
87
88
89
90
# File 'lib/resque/scheduler.rb', line 82

def handle_delayed_items
  item = nil
  begin
    if timestamp = Resque.next_delayed_timestamp
      enqueue_delayed_items_for_timestamp(timestamp)
    end
  # continue processing until there are no more ready timestamps
  end while !timestamp.nil?
end

.handle_shutdownObject



113
114
115
116
117
# File 'lib/resque/scheduler.rb', line 113

def handle_shutdown
  exit if @shutdown
  yield
  exit if @shutdown
end

.load_schedule!Object

Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/resque/scheduler.rb', line 53

def load_schedule!
  log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?

  Resque.schedule.each do |name, config|
    # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
    # required for the jobs to be scheduled.  If rails_env is missing, the
    # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
    # to.
    if config['rails_env'].nil? || rails_env_matches?(config)
      log! "Scheduling #{name} "
      if !config['cron'].nil? && config['cron'].length > 0
        rufus_scheduler.cron config['cron'] do
          log! "queuing #{config['class']} (#{name})"
          enqueue_from_config(config)
        end
      else
        log! "no cron found for #{config['class']} (#{name}) - skipping"
      end
    end
  end
end

.log(msg) ⇒ Object



164
165
166
167
# File 'lib/resque/scheduler.rb', line 164

def log(msg)
  # add "verbose" logic later
  log!(msg) if verbose
end

.log!(msg) ⇒ Object



160
161
162
# File 'lib/resque/scheduler.rb', line 160

def log!(msg)
  puts "#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{msg}" unless mute
end

.poll_sleepObject

Sleeps and returns true



147
148
149
150
151
152
# File 'lib/resque/scheduler.rb', line 147

def poll_sleep
  @sleeping = true
  handle_shutdown { sleep 5 }
  @sleeping = false
  true
end

.rails_env_matches?(config) ⇒ Boolean

Returns true if the given schedule config hash matches the current ENV

Returns:

  • (Boolean)


77
78
79
# File 'lib/resque/scheduler.rb', line 77

def rails_env_matches?(config)
  config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV'])
end

.register_signal_handlersObject

For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant). In the case of sleeping, exit immediately.



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/resque/scheduler.rb', line 39

def register_signal_handlers
  trap("TERM") { shutdown }
  trap("INT") { shutdown }
  
  begin
    trap('QUIT') { shutdown   }
    trap('USR1') { kill_child }
  rescue ArgumentError
    warn "Signals QUIT and USR1 not supported."
  end
end

.rufus_schedulerObject



134
135
136
# File 'lib/resque/scheduler.rb', line 134

def rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.start_new
end

.runObject

Schedule all jobs and continually look for delayed jobs (never returns)



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/resque/scheduler.rb', line 19

def run

  # trap signals
  register_signal_handlers

  # Load the schedule into rufus
  load_schedule!

  # Now start the scheduling part of the loop.
  loop do
    handle_delayed_items
    poll_sleep
  end

  # never gets here.
end

.shutdownObject

Sets the shutdown flag, exits if sleeping



155
156
157
158
# File 'lib/resque/scheduler.rb', line 155

def shutdown
  @shutdown = true
  exit if @sleeping
end