Class: SidekiqScheduler::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq-scheduler/scheduler.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = SidekiqScheduler::Config.new(without_defaults: true)) ⇒ Scheduler

Returns a new instance of Scheduler.



54
55
56
57
58
59
60
61
62
# File 'lib/sidekiq-scheduler/scheduler.rb', line 54

def initialize(config = SidekiqScheduler::Config.new(without_defaults: true))
  @scheduler_config = config

  self.enabled = config.enabled?
  self.dynamic = config.dynamic?
  self.dynamic_every = config.dynamic_every?
  self.listened_queues_only = config.listened_queues_only?
  self.rufus_scheduler_options = config.rufus_scheduler_options || {}
end

Instance Attribute Details

#dynamicObject

Set to update the schedule in runtime in a given time period.



27
28
29
# File 'lib/sidekiq-scheduler/scheduler.rb', line 27

def dynamic
  @dynamic
end

#dynamic_everyObject

Set to update the schedule in runtime dynamically per this period.



30
31
32
# File 'lib/sidekiq-scheduler/scheduler.rb', line 30

def dynamic_every
  @dynamic_every
end

#enabledObject

Set to enable or disable the scheduler.



24
25
26
# File 'lib/sidekiq-scheduler/scheduler.rb', line 24

def enabled
  @enabled
end

#listened_queues_onlyObject

Set to schedule jobs only when will be pushed to queues listened by sidekiq



33
34
35
# File 'lib/sidekiq-scheduler/scheduler.rb', line 33

def listened_queues_only
  @listened_queues_only
end

#rufus_scheduler_optionsObject

Set custom options for rufus scheduler, like max_work_threads.



36
37
38
# File 'lib/sidekiq-scheduler/scheduler.rb', line 36

def rufus_scheduler_options
  @rufus_scheduler_options
end

Class Method Details

.instanceObject



40
41
42
43
# File 'lib/sidekiq-scheduler/scheduler.rb', line 40

def instance
  @instance = new unless @instance
  @instance
end

.instance=(value) ⇒ Object



45
46
47
# File 'lib/sidekiq-scheduler/scheduler.rb', line 45

def instance=(value)
  @instance = value
end

.method_missing(method, *arguments, &block) ⇒ Object



49
50
51
# File 'lib/sidekiq-scheduler/scheduler.rb', line 49

def method_missing(method, *arguments, &block)
  instance_methods.include?(method) ? instance.public_send(method, *arguments) : super
end

Instance Method Details

#clear_schedule!(stop_option = :wait) ⇒ Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler

Parameters:

  • stop_option (Symbol) (defaults to: :wait)

    The option to be passed to Rufus::Scheduler#stop



193
194
195
196
197
198
199
200
201
202
# File 'lib/sidekiq-scheduler/scheduler.rb', line 193

def clear_schedule!(stop_option = :wait)
  if @rufus_scheduler
    @rufus_scheduler.stop(stop_option)
    @rufus_scheduler = nil
  end

  @scheduled_jobs = {}

  rufus_scheduler
end

#enqueue_job(job_config, time = Time.now) ⇒ Object

Enqueue a job based on a config hash

Parameters:

  • job_config (Hash)

    the job configuration

  • time (Time) (defaults to: Time.now)

    time the job is enqueued



171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/sidekiq-scheduler/scheduler.rb', line 171

def enqueue_job(job_config, time = Time.now)
  config = prepare_arguments(job_config.dup)

  if config.delete('include_metadata')
    config['args'] = (config['args'], "scheduled_at" => time.to_f.round(3))
  end

  if SidekiqScheduler::Utils.active_job_enqueue?(config['class'])
    SidekiqScheduler::Utils.enqueue_with_active_job(config)
  else
    SidekiqScheduler::Utils.enqueue_with_sidekiq(config)
  end
end

#idempotent_job_enqueue(job_name, time, config) ⇒ Object

Pushes the job into Sidekiq if not already pushed for the given time

Parameters:

  • job_name (String)

    The job’s name

  • time (Time)

    The time when the job got cleared for triggering

  • config (Hash)

    Job’s config hash



153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/sidekiq-scheduler/scheduler.rb', line 153

def idempotent_job_enqueue(job_name, time, config)
  registered = SidekiqScheduler::RedisManager.register_job_instance(job_name, time)

  if registered
    Sidekiq.logger.info "queueing #{config['class']} (#{job_name})"

    handle_errors { enqueue_job(config, time) }

    SidekiqScheduler::RedisManager.remove_elder_job_instances(job_name)
  else
    Sidekiq.logger.debug { "Ignoring #{job_name} job as it has been already enqueued" }
  end
end

#job_enabled?(name) ⇒ Boolean

Returns:

  • (Boolean)


234
235
236
237
# File 'lib/sidekiq-scheduler/scheduler.rb', line 234

def job_enabled?(name)
  job = Sidekiq.schedule[name]
  schedule_state(name).fetch('enabled', job.fetch('enabled', true)) if job
end

#load_schedule!Object

Pulls the schedule from Sidekiq.schedule and loads it into the rufus scheduler instance



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/sidekiq-scheduler/scheduler.rb', line 81

def load_schedule!
  if enabled
    Sidekiq.logger.info 'Loading Schedule'

    # Load schedule from redis for the first time if dynamic
    if dynamic
      Sidekiq.reload_schedule!
      @current_changed_score = Time.now.to_f
      rufus_scheduler.every(dynamic_every) do
        update_schedule
      end
    end

    Sidekiq.logger.info 'Schedule empty! Set Sidekiq.schedule' if Sidekiq.schedule.empty?

    @scheduled_jobs = {}
    queues = scheduler_config.sidekiq_queues

    Sidekiq.schedule.each do |name, config|
      if !listened_queues_only || enabled_queue?(config['queue'].to_s, queues)
        load_schedule_job(name, config)
      else
        Sidekiq.logger.info { "Ignoring #{name}, job's queue is not enabled." }
      end
    end

    Sidekiq.logger.info 'Schedules Loaded'
  else
    Sidekiq.logger.info 'SidekiqScheduler is disabled'
  end
end

#load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/sidekiq-scheduler/scheduler.rb', line 114

def load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || rails_env_matches?(config)
    Sidekiq.logger.info "Scheduling #{name} #{config}"
    interval_defined = false
    interval_types = %w(cron every at in interval)
    interval_types.each do |interval_type|
      config_interval_type = config[interval_type]

      if !config_interval_type.nil? && config_interval_type.length > 0

        schedule, options = SidekiqScheduler::RufusUtils.normalize_schedule_options(config_interval_type)

        rufus_job = new_job(name, interval_type, config, schedule, options)
        return unless rufus_job

        @scheduled_jobs[name] = rufus_job
        SidekiqScheduler::Utils.update_job_next_time(name, rufus_job.next_time)

        interval_defined = true

        break
      end
    end

    unless interval_defined
      Sidekiq.logger.info "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
    end
  end
end


69
70
71
72
73
74
75
76
77
# File 'lib/sidekiq-scheduler/scheduler.rb', line 69

def print_schedule
  if rufus_scheduler
    Sidekiq.logger.info "Scheduling Info\tLast Run"
    scheduler_jobs = rufus_scheduler.jobs
    scheduler_jobs.each_value do |v|
      Sidekiq.logger.info "#{v.t}\t#{v.last}\t"
    end
  end
end

#reload_schedule!Object



204
205
206
207
208
209
210
211
212
# File 'lib/sidekiq-scheduler/scheduler.rb', line 204

def reload_schedule!
  if enabled
    Sidekiq.logger.info 'Reloading Schedule'
    clear_schedule!
    load_schedule!
  else
    Sidekiq.logger.info 'SidekiqScheduler is disabled'
  end
end

#rufus_schedulerObject



185
186
187
# File 'lib/sidekiq-scheduler/scheduler.rb', line 185

def rufus_scheduler
  @rufus_scheduler ||= SidekiqScheduler::Utils.new_rufus_scheduler(rufus_scheduler_options)
end

#scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



65
66
67
# File 'lib/sidekiq-scheduler/scheduler.rb', line 65

def scheduled_jobs
  @scheduled_jobs
end

#to_hashObject



253
254
255
256
257
# File 'lib/sidekiq-scheduler/scheduler.rb', line 253

def to_hash
  {
    scheduler_config: @scheduler_config.to_hash
  }
end

#toggle_all_jobs(new_state) ⇒ Object



245
246
247
248
249
250
251
# File 'lib/sidekiq-scheduler/scheduler.rb', line 245

def toggle_all_jobs(new_state)
  Sidekiq.schedule!.keys.each do |name|
    state = schedule_state(name)
    state['enabled'] = new_state
    set_schedule_state(name, state)
  end
end

#toggle_job_enabled(name) ⇒ Object



239
240
241
242
243
# File 'lib/sidekiq-scheduler/scheduler.rb', line 239

def toggle_job_enabled(name)
  state = schedule_state(name)
  state['enabled'] = !job_enabled?(name)
  set_schedule_state(name, state)
end

#update_scheduleObject



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/sidekiq-scheduler/scheduler.rb', line 214

def update_schedule
  last_changed_score, @current_changed_score = @current_changed_score, Time.now.to_f
  schedule_changes = SidekiqScheduler::RedisManager.get_schedule_changes(last_changed_score, @current_changed_score)

  if schedule_changes.size > 0
    Sidekiq.logger.info 'Updating schedule'

    Sidekiq.reload_schedule!
    schedule_changes.each do |schedule_name|
      if Sidekiq.schedule.keys.include?(schedule_name)
        unschedule_job(schedule_name)
        load_schedule_job(schedule_name, Sidekiq.schedule[schedule_name])
      else
        unschedule_job(schedule_name)
      end
    end
    Sidekiq.logger.info 'Schedule updated'
  end
end