Class: Sidekiq::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Util
Defined in:
lib/sidekiq/scheduler.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.dynamicObject

Set to update the schedule in runtime in a given time period.



23
24
25
# File 'lib/sidekiq/scheduler.rb', line 23

def dynamic
  @dynamic
end

.enabledObject

Set to enable or disable the scheduler.



20
21
22
# File 'lib/sidekiq/scheduler.rb', line 20

def enabled
  @enabled
end

.listened_queues_onlyObject

Set to schedule jobs only when will be pushed to queues listened by sidekiq



26
27
28
# File 'lib/sidekiq/scheduler.rb', line 26

def listened_queues_only
  @listened_queues_only
end

Class Method Details

.active_job_enqueue?(klass) ⇒ Boolean

Returns true if the enqueuing needs to be done for an ActiveJob

class false otherwise.

Parameters:

  • klass (Class)

    the class to check is decendant from ActiveJob

Returns:

  • (Boolean)


231
232
233
# File 'lib/sidekiq/scheduler.rb', line 231

def self.active_job_enqueue?(klass)
  defined?(ActiveJob::Enqueuing) && klass.included_modules.include?(ActiveJob::Enqueuing)
end

.clear_schedule!Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler



168
169
170
171
172
173
# File 'lib/sidekiq/scheduler.rb', line 168

def self.clear_schedule!
  self.rufus_scheduler.stop
  @rufus_scheduler = nil
  @@scheduled_jobs = {}
  self.rufus_scheduler
end

.enabled_queue?(job_queue) ⇒ Boolean

Returns true if a job’s queue is being listened on by sidekiq

Parameters:

  • job_queue (String)

    Job’s queue name

Returns:

  • (Boolean)


260
261
262
263
264
# File 'lib/sidekiq/scheduler.rb', line 260

def self.enabled_queue?(job_queue)
  queues = Sidekiq.options[:queues]

  queues.empty? || queues.include?(job_queue)
end

.enque_with_active_job(config) ⇒ Object



209
210
211
# File 'lib/sidekiq/scheduler.rb', line 209

def self.enque_with_active_job(config)
  initialize_active_job(config['class'], config['args']).enqueue(config)
end

.enque_with_sidekiq(config) ⇒ Object



213
214
215
# File 'lib/sidekiq/scheduler.rb', line 213

def self.enque_with_sidekiq(config)
  Sidekiq::Client.push(config)
end

.enqueue_job(job_config) ⇒ Object

Enqueue a job based on a config hash



144
145
146
147
148
149
150
151
152
# File 'lib/sidekiq/scheduler.rb', line 144

def self.enqueue_job(job_config)
  config = prepare_arguments(job_config.dup)

  if active_job_enqueue?(config['class'])
    enque_with_active_job(config)
  else
    enque_with_sidekiq(config)
  end
end

.handle_errorsObject



135
136
137
138
139
140
141
# File 'lib/sidekiq/scheduler.rb', line 135

def self.handle_errors
  begin
    yield
  rescue StandardError => e
    logger.info "#{e.class.name}: #{e.message}"
  end
end

.initialize_active_job(klass, args) ⇒ Object



217
218
219
220
221
222
223
# File 'lib/sidekiq/scheduler.rb', line 217

def self.initialize_active_job(klass, args)
  if args.is_a?(Array)
    klass.new(*args)
  else
    klass.new(args)
  end
end

.load_schedule!Object

Pulls the schedule from Sidekiq.schedule and loads it into the rufus scheduler instance



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/sidekiq/scheduler.rb', line 46

def self.load_schedule!
  if enabled
    logger.info 'Loading Schedule'

    # Load schedule from redis for the first time if dynamic
    if dynamic
      Sidekiq.reload_schedule!
      self.rufus_scheduler.every('5s') do
        self.update_schedule
      end
    end

    logger.info 'Schedule empty! Set Sidekiq.schedule' if Sidekiq.schedule.empty?


    @@scheduled_jobs = {}

    Sidekiq.schedule.each do |name, config|
      if !listened_queues_only || enabled_queue?(config['queue'])
        self.load_schedule_job(name, config)
      else
        logger.info { "Ignoring #{name}, job's queue is not enabled." }
      end
    end

    Sidekiq.redis { |r| r.del(:schedules_changed) }

    logger.info 'Schedules Loaded'
  else
    logger.info 'SidekiqScheduler is disabled'
  end
end

.load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/sidekiq/scheduler.rb', line 95

def self.load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || self.rails_env_matches?(config)
    logger.info "Scheduling #{name} #{config}"
    interval_defined = false
    interval_types = %w{cron every at in}
    interval_types.each do |interval_type|
      if !config[interval_type].nil? && config[interval_type].length > 0
        args = self.optionizate_interval_value(config[interval_type])

        # We want rufus_scheduler to return a job object, not a job id
        opts = { :job => true }

        @@scheduled_jobs[name] = self.rufus_scheduler.send(interval_type, *args, opts) do
          logger.info "queueing #{config['class']} (#{name})"
          config.delete(interval_type)
          self.handle_errors { self.enqueue_job(config) }
        end

        interval_defined = true

        break
      end
    end

    unless interval_defined
      logger.info "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
    end
  end
end

.optionizate_interval_value(value) ⇒ Object

modify interval type value to value with options if options available



80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/sidekiq/scheduler.rb', line 80

def self.optionizate_interval_value(value)
  args = value
  if args.is_a?(::Array)
    return args.first if args.size > 2 || !args.last.is_a?(::Hash)
    # symbolize keys of hash for options
    args[1] = args[1].inject({}) do |m, i|
      key, value = i
      m[(key.to_sym rescue key) || key] = value
      m
    end
  end
  args
end

.prepare_arguments(config) ⇒ Hash

Convert the given arguments in the format expected to be enqueued.

Parameters:

  • config (Hash)

    the options to be converted

Options Hash (config):

  • class (String)

    the job class

  • args (Hash/Array)

    the arguments to be passed to the job class

Returns:

  • (Hash)


243
244
245
246
247
248
249
250
251
252
253
# File 'lib/sidekiq/scheduler.rb', line 243

def self.prepare_arguments(config)
  config['class'] = config['class'].constantize if config['class'].is_a?(String)

  if config['args'].is_a?(Hash)
    config['args'].symbolize_keys! if config['args'].respond_to?(:symbolize_keys!)
  else
    config['args'] = Array(config['args'])
  end

  config
end


34
35
36
37
38
39
40
41
42
# File 'lib/sidekiq/scheduler.rb', line 34

def self.print_schedule
  if self.rufus_scheduler
    logger.info "Scheduling Info\tLast Run"
    scheduler_jobs = self.rufus_scheduler.all_jobs
    scheduler_jobs.each do |_, v|
      logger.info "#{v.t}\t#{v.last}\t"
    end
  end
end

.rails_env_matches?(config) ⇒ Boolean

Returns true if the given schedule config hash matches the current ENV

Returns:

  • (Boolean)


131
132
133
# File 'lib/sidekiq/scheduler.rb', line 131

def self.rails_env_matches?(config)
  config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/, '').split(',').include?(ENV['RAILS_ENV'])
end

.reload_schedule!Object



175
176
177
178
179
180
181
182
183
# File 'lib/sidekiq/scheduler.rb', line 175

def self.reload_schedule!
  if enabled
    logger.info 'Reloading Schedule'
    self.clear_schedule!
    self.load_schedule!
  else
    logger.info 'SidekiqScheduler is disabled'
  end
end

.rufus_schedulerObject



162
163
164
# File 'lib/sidekiq/scheduler.rb', line 162

def self.rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.new(rufus_scheduler_options)
end

.rufus_scheduler_optionsObject



154
155
156
# File 'lib/sidekiq/scheduler.rb', line 154

def self.rufus_scheduler_options
  @rufus_scheduler_options ||= {}
end

.rufus_scheduler_options=(options) ⇒ Object



158
159
160
# File 'lib/sidekiq/scheduler.rb', line 158

def self.rufus_scheduler_options=(options)
  @rufus_scheduler_options = options
end

.scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



30
31
32
# File 'lib/sidekiq/scheduler.rb', line 30

def self.scheduled_jobs
  @@scheduled_jobs
end

.unschedule_job(name) ⇒ Object



201
202
203
204
205
206
207
# File 'lib/sidekiq/scheduler.rb', line 201

def self.unschedule_job(name)
  if self.scheduled_jobs[name]
    logger.debug "Removing schedule #{name}"
    self.scheduled_jobs[name].unschedule
    self.scheduled_jobs.delete(name)
  end
end

.update_scheduleObject



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/sidekiq/scheduler.rb', line 185

def self.update_schedule
  if Sidekiq.redis { |r| r.scard(:schedules_changed) } > 0
    logger.info 'Updating schedule'
    Sidekiq.reload_schedule!
    while schedule_name = Sidekiq.redis { |r| r.spop(:schedules_changed) }
      if Sidekiq.schedule.keys.include?(schedule_name)
        self.unschedule_job(schedule_name)
        self.load_schedule_job(schedule_name, Sidekiq.schedule[schedule_name])
      else
        self.unschedule_job(schedule_name)
      end
    end
    logger.info 'Schedules Loaded'
  end
end