Class: Qu::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/qu/scheduler.rb,
lib/qu/scheduler/version.rb

Constant Summary collapse

VERSION =
"0.1.1"
@@scheduled_jobs =
{}

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.dynamicObject

If set, will try to update the schedule in the loop



13
14
15
# File 'lib/qu/scheduler.rb', line 13

def dynamic
  @dynamic
end

.poll_sleep_amountObject



24
25
26
# File 'lib/qu/scheduler.rb', line 24

def poll_sleep_amount
  @poll_sleep_amount ||= 5 # seconds
end

Class Method Details

.clear_schedule!Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler



186
187
188
189
190
191
# File 'lib/qu/scheduler.rb', line 186

def clear_schedule!
  rufus_scheduler.stop
  @rufus_scheduler = nil
  @@scheduled_jobs = {}
  rufus_scheduler
end

.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object

Enqueues all delayed jobs for a timestamp



135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/qu/scheduler.rb', line 135

def enqueue_delayed_items_for_timestamp(timestamp)
  item = nil
  begin
    handle_shutdown do
      if item = Qu.backend.next_item_for_timestamp(timestamp)
        Qu.logger.debug("queuing #{item['klass']} [delayed]")
        handle_errors { enqueue_from_config(item) }
      end
    end
    # continue processing until there are no more ready items in this timestamp
  end while !item.nil?
end

.enqueue_from_config(job_config) ⇒ Object

Enqueues a job based on a config hash



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/qu/scheduler.rb', line 163

def enqueue_from_config(job_config)
  payload = Payload.new(job_config)
  if job_klass = job_config['custom_job_class']
    # The custom job class API must offer a static "scheduled" method. If the custom
    # job class can not be constantized (via a requeue call from the web perhaps), fall
    # back to enqueing normally via Qu::Job.create.
    begin
      Qu::Payload.new.send(:constantize, job_klass).scheduled(payload.queue, job_klass, *payload.args)
    rescue NameError
      # Note that the custom job class (job_config['custom_job_class']) is the one enqueued
      Qu.backend.enqueue(Payload.new(:klass => job_klass, :args => payload.args))
    end
  else
    Qu.backend.enqueue(payload)
  end
end

.handle_delayed_items(at_time = nil) ⇒ Object

Handles queueing delayed items at_time - Time to start scheduling items (default: now).



123
124
125
126
127
128
129
130
131
132
# File 'lib/qu/scheduler.rb', line 123

def handle_delayed_items(at_time=nil)
  item = nil
  if timestamp = Qu.backend.next_delayed_timestamp(at_time)
    set_process_title "processing delayed jobs"
    while !timestamp.nil?
      enqueue_delayed_items_for_timestamp(timestamp)
      timestamp = Qu.backend.next_delayed_timestamp(at_time)
    end
  end
end

.handle_errorsObject



154
155
156
157
158
159
160
# File 'lib/qu/scheduler.rb', line 154

def handle_errors
  begin
    yield
  rescue Exception => e
    Qu.logger.fatal("#{e.class.name}: #{e.message}")
  end
end

.handle_shutdownObject



148
149
150
151
152
# File 'lib/qu/scheduler.rb', line 148

def handle_shutdown
  exit if @shutdown
  yield
  exit if @shutdown
end

.load_schedule!Object

Pulls the schedule from Qu.schedule and loads it into the rufus scheduler instance



85
86
87
# File 'lib/qu/scheduler.rb', line 85

def load_schedule!
  Qu.backend.load_schedule!
end

.load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/qu/scheduler.rb', line 90

def load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || rails_env_matches?(config)
    Qu.logger.info("Scheduling #{name}")
    interval_defined = false
    interval_types = %w{cron every}
    interval_types.each do |interval_type|
      if !config[interval_type].nil? && config[interval_type].length > 0
        @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, config[interval_type]) do
          Qu.logger.info("queueing #{config['class']} (#{name})")
          handle_errors { enqueue_from_config(config) }
        end
        interval_defined = true
        break
      end
    end
    unless interval_defined
      Qu.logger.warn("no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping")
    end
  end
end

.poll_sleepObject

Sleeps and returns true



208
209
210
211
212
213
# File 'lib/qu/scheduler.rb', line 208

def poll_sleep
  @sleeping = true
  handle_shutdown { sleep poll_sleep_amount }
  @sleeping = false
  true
end


73
74
75
76
77
78
79
80
81
# File 'lib/qu/scheduler.rb', line 73

def print_schedule
  if rufus_scheduler
    Qu.logger.info("Scheduling Info\tLast Run")
    scheduler_jobs = rufus_scheduler.all_jobs
    scheduler_jobs.each do |k, v|
      Qu.logger.info("#{v.t}\t#{v.last}")
    end
  end
end

.rails_env_matches?(config) ⇒ Boolean

Returns true if the given schedule config hash matches the current ENV

Returns:

  • (Boolean)


117
118
119
# File 'lib/qu/scheduler.rb', line 117

def rails_env_matches?(config)
  config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV'])
end

.register_signal_handlersObject

For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant). In the case of sleeping, exit immediately.



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/qu/scheduler.rb', line 60

def register_signal_handlers
  trap("TERM") { shutdown }
  trap("INT") { shutdown }

  begin
    trap('QUIT') { shutdown }
    trap('USR1') { print_schedule }
    trap('USR2') { reload_schedule! }
  rescue ArgumentError
    warn "Signals QUIT, USR1 and USR2 not supported."
  end
end

.reload_schedule!Object



193
194
195
196
197
# File 'lib/qu/scheduler.rb', line 193

def reload_schedule!
  set_process_title "reloading schedule"
  clear_schedule!
  load_schedule!
end

.rufus_schedulerObject



180
181
182
# File 'lib/qu/scheduler.rb', line 180

def rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.start_new
end

.runObject

Schedule all jobs and continually look for delayed jobs (never returns)



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/qu/scheduler.rb', line 29

def run
  set_process_title "starting"

  # trap signals
  register_signal_handlers

  # Load the schedule into rufus
  # If dynamic is set, load that schedule otherwise use normal load
  if dynamic
    reload_schedule!
  else
    load_schedule!
  end

  # Now start the scheduling part of the loop.
  loop do
    begin
      handle_delayed_items
      update_schedule if dynamic
    rescue Errno::EAGAIN, Errno::ECONNRESET => e
      warn e.message
    end
    poll_sleep
  end

  # never gets here.
end

.scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



20
21
22
# File 'lib/qu/scheduler.rb', line 20

def scheduled_jobs
  @@scheduled_jobs
end

.set_process_title(string) ⇒ Object



221
222
223
224
# File 'lib/qu/scheduler.rb', line 221

def set_process_title(string)
  Qu.logger.info(string)
  $0 = "qu-scheduler-#{Qu::Scheduler::VERSION}: #{string}"
end

.shutdownObject

Sets the shutdown flag, exits if sleeping



216
217
218
219
# File 'lib/qu/scheduler.rb', line 216

def shutdown
  @shutdown = true
  exit if @sleeping
end

.unschedule_job(name) ⇒ Object



199
200
201
202
203
204
205
# File 'lib/qu/scheduler.rb', line 199

def unschedule_job(name)
  if scheduled_jobs[name]
    Qu.logger.debug("Removing schedule #{name}")
    scheduled_jobs[name].unschedule
    @@scheduled_jobs.delete(name)
  end
end