Class: Sidekiq::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Util
Defined in:
lib/sidekiq/scheduler.rb

Constant Summary collapse

RUFUS_METADATA_KEYS =
%w(description at cron every in interval enabled)

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.dynamicObject

Set to update the schedule in runtime in a given time period.



28
29
30
# File 'lib/sidekiq/scheduler.rb', line 28

def dynamic
  @dynamic
end

.dynamic_everyObject

Set to update the schedule in runtime dynamically per this period.



31
32
33
# File 'lib/sidekiq/scheduler.rb', line 31

def dynamic_every
  @dynamic_every
end

.enabledObject

Set to enable or disable the scheduler.



25
26
27
# File 'lib/sidekiq/scheduler.rb', line 25

def enabled
  @enabled
end

.listened_queues_onlyObject

Set to schedule jobs only when will be pushed to queues listened by sidekiq



34
35
36
# File 'lib/sidekiq/scheduler.rb', line 34

def listened_queues_only
  @listened_queues_only
end

Class Method Details

.active_job_enqueue?(klass) ⇒ Boolean

Returns true if the enqueuing needs to be done for an ActiveJob

class false otherwise.

Parameters:

  • klass (Class)

    the class to check is decendant from ActiveJob

Returns:

  • (Boolean)


274
275
276
277
# File 'lib/sidekiq/scheduler.rb', line 274

def active_job_enqueue?(klass)
  klass.is_a?(Class) && defined?(ActiveJob::Enqueuing) &&
    klass.included_modules.include?(ActiveJob::Enqueuing)
end

.clear_schedule!Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler



204
205
206
207
208
209
# File 'lib/sidekiq/scheduler.rb', line 204

def clear_schedule!
  rufus_scheduler.stop
  @rufus_scheduler = nil
  @@scheduled_jobs = {}
  rufus_scheduler
end

.enabled_queue?(job_queue, queues) ⇒ Boolean

Returns true if a job’s queue is included in the array of queues

If queues are empty, returns true.

Parameters:

  • job_queue (String)

    Job’s queue name

  • queues (Array<String>)

Returns:

  • (Boolean)


313
314
315
# File 'lib/sidekiq/scheduler.rb', line 313

def enabled_queue?(job_queue, queues)
  queues.empty? || queues.include?(job_queue)
end

.enqueue_job(job_config, time = Time.now) ⇒ Object

Enqueue a job based on a config hash

Parameters:

  • job_config (Hash)

    the job configuration

  • time (Time) (defaults to: Time.now)

    time the job is enqueued



176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/sidekiq/scheduler.rb', line 176

def enqueue_job(job_config, time=Time.now)
  config = prepare_arguments(job_config.dup)

  if config.delete('include_metadata')
    config['args'] = (config['args'], scheduled_at: time.to_f)
  end

  if active_job_enqueue?(config['class'])
    enqueue_with_active_job(config)
  else
    enqueue_with_sidekiq(config)
  end
end

.enqueue_with_active_job(config) ⇒ Object



248
249
250
251
252
253
254
# File 'lib/sidekiq/scheduler.rb', line 248

def enqueue_with_active_job(config)
  options = {
    queue: config['queue']
  }.keep_if { |_, v| !v.nil? }

  initialize_active_job(config['class'], config['args']).enqueue(options)
end

.enqueue_with_sidekiq(config) ⇒ Object



256
257
258
# File 'lib/sidekiq/scheduler.rb', line 256

def enqueue_with_sidekiq(config)
  Sidekiq::Client.push(sanitize_job_config(config))
end

.handle_errorsObject



164
165
166
167
168
169
170
# File 'lib/sidekiq/scheduler.rb', line 164

def handle_errors
  begin
    yield
  rescue StandardError => e
    logger.info "#{e.class.name}: #{e.message}"
  end
end

.idempotent_job_enqueue(job_name, time, config) ⇒ Object

Pushes the job into Sidekiq if not already pushed for the given time

Parameters:

  • job_name (String)

    The job’s name

  • time (Time)

    The time when the job got cleared for triggering

  • config (Hash)

    Job’s config hash



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/sidekiq/scheduler.rb', line 124

def idempotent_job_enqueue(job_name, time, config)
  registered = register_job_instance(job_name, time)

  if registered
    logger.info "queueing #{config['class']} (#{job_name})"

    handle_errors { enqueue_job(config, time) }

    remove_elder_job_instances(job_name)
  else
    logger.debug { "Ignoring #{job_name} job as it has been already enqueued" }
  end
end

.initialize_active_job(klass, args) ⇒ Object



260
261
262
263
264
265
266
# File 'lib/sidekiq/scheduler.rb', line 260

def initialize_active_job(klass, args)
  if args.is_a?(Array)
    klass.new(*args)
  else
    klass.new(args)
  end
end

.job_enabled?(name) ⇒ Boolean

Returns:

  • (Boolean)


331
332
333
334
# File 'lib/sidekiq/scheduler.rb', line 331

def job_enabled?(name)
  job = Sidekiq.schedule[name]
  schedule_state(name).fetch('enabled', job.fetch('enabled', true)) if job
end

.load_schedule!Object

Pulls the schedule from Sidekiq.schedule and loads it into the rufus scheduler instance



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/sidekiq/scheduler.rb', line 53

def load_schedule!
  if enabled
    logger.info 'Loading Schedule'

    # Load schedule from redis for the first time if dynamic
    if dynamic
      Sidekiq.reload_schedule!
      @current_changed_score = Time.now.to_f
      rufus_scheduler.every(dynamic_every) do
        update_schedule
      end
    end

    logger.info 'Schedule empty! Set Sidekiq.schedule' if Sidekiq.schedule.empty?


    @@scheduled_jobs = {}
    queues = sidekiq_queues

    Sidekiq.schedule.each do |name, config|
      if !listened_queues_only || enabled_queue?(config['queue'].to_s, queues)
        load_schedule_job(name, config)
      else
        logger.info { "Ignoring #{name}, job's queue is not enabled." }
      end
    end

    logger.info 'Schedules Loaded'
  else
    logger.info 'SidekiqScheduler is disabled'
  end
end

.load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/sidekiq/scheduler.rb', line 87

def load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || rails_env_matches?(config)
    logger.info "Scheduling #{name} #{config}"
    interval_defined = false
    interval_types = %w{cron every at in interval}
    interval_types.each do |interval_type|
      config_interval_type = config[interval_type]

      if !config_interval_type.nil? && config_interval_type.length > 0

        schedule, options = SidekiqScheduler::RufusUtils.normalize_schedule_options(config_interval_type)

        rufus_job = new_job(name, interval_type, config, schedule, options)
        @@scheduled_jobs[name] = rufus_job
        update_job_next_time(name, rufus_job.next_time)

        interval_defined = true

        break
      end
    end

    unless interval_defined
      logger.info "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
    end
  end
end

.prepare_arguments(config) ⇒ Hash

Convert the given arguments in the format expected to be enqueued.

Parameters:

  • config (Hash)

    the options to be converted

Options Hash (config):

  • class (String)

    the job class

  • args (Hash/Array)

    the arguments to be passed to the job class

Returns:

  • (Hash)


287
288
289
290
291
292
293
294
295
296
297
# File 'lib/sidekiq/scheduler.rb', line 287

def prepare_arguments(config)
  config['class'] = try_to_constantize(config['class'])

  if config['args'].is_a?(Hash)
    config['args'].symbolize_keys! if config['args'].respond_to?(:symbolize_keys!)
  else
    config['args'] = Array(config['args'])
  end

  config
end


41
42
43
44
45
46
47
48
49
# File 'lib/sidekiq/scheduler.rb', line 41

def print_schedule
  if rufus_scheduler
    logger.info "Scheduling Info\tLast Run"
    scheduler_jobs = rufus_scheduler.all_jobs
    scheduler_jobs.each do |_, v|
      logger.info "#{v.t}\t#{v.last}\t"
    end
  end
end

.rails_env_matches?(config) ⇒ Boolean

Returns true if the given schedule config hash matches the current ENV

Returns:

  • (Boolean)


160
161
162
# File 'lib/sidekiq/scheduler.rb', line 160

def rails_env_matches?(config)
  config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/, '').split(',').include?(ENV['RAILS_ENV'])
end

.register_job_instance(job_name, time) ⇒ Boolean

Registers a queued job instance

Parameters:

  • job_name (String)

    The job’s name

  • time (Time)

    Time at which the job was cleared by the scheduler

Returns:

  • (Boolean)

    true if the job was registered, false when otherwise



323
324
325
# File 'lib/sidekiq/scheduler.rb', line 323

def register_job_instance(job_name, time)
  SidekiqScheduler::RedisManager.register_job_instance(job_name, time)
end

.reload_schedule!Object



211
212
213
214
215
216
217
218
219
# File 'lib/sidekiq/scheduler.rb', line 211

def reload_schedule!
  if enabled
    logger.info 'Reloading Schedule'
    clear_schedule!
    load_schedule!
  else
    logger.info 'SidekiqScheduler is disabled'
  end
end

.remove_elder_job_instances(job_name) ⇒ Object



327
328
329
# File 'lib/sidekiq/scheduler.rb', line 327

def remove_elder_job_instances(job_name)
  SidekiqScheduler::RedisManager.remove_elder_job_instances(job_name)
end

.rufus_schedulerObject



198
199
200
# File 'lib/sidekiq/scheduler.rb', line 198

def rufus_scheduler
  @rufus_scheduler ||= new_rufus_scheduler
end

.rufus_scheduler_optionsObject



190
191
192
# File 'lib/sidekiq/scheduler.rb', line 190

def rufus_scheduler_options
  @rufus_scheduler_options ||= {}
end

.rufus_scheduler_options=(options) ⇒ Object



194
195
196
# File 'lib/sidekiq/scheduler.rb', line 194

def rufus_scheduler_options=(options)
  @rufus_scheduler_options = options
end

.scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



37
38
39
# File 'lib/sidekiq/scheduler.rb', line 37

def scheduled_jobs
  @@scheduled_jobs
end

.toggle_job_enabled(name) ⇒ Object



336
337
338
339
340
# File 'lib/sidekiq/scheduler.rb', line 336

def toggle_job_enabled(name)
  state = schedule_state(name)
  state['enabled'] = !job_enabled?(name)
  set_schedule_state(name, state)
end

.try_to_constantize(klass) ⇒ Object



299
300
301
302
303
# File 'lib/sidekiq/scheduler.rb', line 299

def try_to_constantize(klass)
  klass.is_a?(String) ? klass.constantize : klass
rescue NameError
  klass
end

.unschedule_job(name) ⇒ Object



240
241
242
243
244
245
246
# File 'lib/sidekiq/scheduler.rb', line 240

def unschedule_job(name)
  if scheduled_jobs[name]
    logger.debug "Removing schedule #{name}"
    scheduled_jobs[name].unschedule
    scheduled_jobs.delete(name)
  end
end

.update_job_last_time(name, last_time) ⇒ Object

Pushes job’s last execution time

Parameters:

  • name (String)

    The job’s name

  • last_time (Time)

    The job’s last execution time



154
155
156
# File 'lib/sidekiq/scheduler.rb', line 154

def update_job_last_time(name, last_time)
  SidekiqScheduler::RedisManager.set_job_last_time(name, last_time) if last_time
end

.update_job_next_time(name, next_time) ⇒ Object

Pushes job’s next time execution

Parameters:

  • name (String)

    The job’s name

  • next_time (Time)

    The job’s next time execution



142
143
144
145
146
147
148
# File 'lib/sidekiq/scheduler.rb', line 142

def update_job_next_time(name, next_time)
  if next_time
    SidekiqScheduler::RedisManager.set_job_next_time(name, next_time)
  else
    SidekiqScheduler::RedisManager.remove_job_next_time(name)
  end
end

.update_scheduleObject



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/sidekiq/scheduler.rb', line 221

def update_schedule
  last_changed_score, @current_changed_score = @current_changed_score, Time.now.to_f
  schedule_changes = SidekiqScheduler::RedisManager.get_schedule_changes(last_changed_score, @current_changed_score)

  if schedule_changes.size > 0
    logger.info 'Updating schedule'
    Sidekiq.reload_schedule!
    schedule_changes.each do |schedule_name|
      if Sidekiq.schedule.keys.include?(schedule_name)
        unschedule_job(schedule_name)
        load_schedule_job(schedule_name, Sidekiq.schedule[schedule_name])
      else
        unschedule_job(schedule_name)
      end
    end
    logger.info 'Schedule updated'
  end
end