Module: SidekiqScheduler::Utils

Defined in:
lib/sidekiq-scheduler/utils.rb

Constant Summary collapse

RUFUS_METADATA_KEYS =
%w(description at cron every in interval enabled)

Class Method Summary collapse

Class Method Details

.active_job_enqueue?(klass) ⇒ Boolean

Returns true if the enqueuing needs to be done for an ActiveJob

class false otherwise.

Parameters:

  • klass (Class)

    the class to check is descendant from ActiveJob

Returns:

  • (Boolean)


79
80
81
82
# File 'lib/sidekiq-scheduler/utils.rb', line 79

def self.active_job_enqueue?(klass)
  klass.is_a?(Class) && defined?(ActiveJob::Enqueuing) &&
    klass.included_modules.include?(ActiveJob::Enqueuing)
end

.calc_cron_run_time(cron, time) ⇒ Time

Try to figure out when the cron job was supposed to run.

Rufus calls the scheduler block with the current time and not the time the block was scheduled to run. This means under certain conditions you could have a job get scheduled multiple times because ‘time.to_i` is used to key the job in redis. If one server is under load and Rufus tries to run the jobs 1 seconds after the other server then the job will be queued twice. This method essentially makes a best guess at when this job was supposed to run and return that.

Parameters:

  • cron (Fugit::Cron)
  • time (Time)

Returns:

  • (Time)


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/sidekiq-scheduler/utils.rb', line 157

def self.calc_cron_run_time(cron, time)
  time = time.floor # remove sub seconds to prevent rounding errors.
  return time if cron.match?(time) # If the time is a perfect match then return it.

  next_t = cron.next_time(time).to_t
  previous_t = cron.previous_time(time).to_t
  # The `time` var is some point between `previous_t` and `next_t`.
  # Figure out how far off we are from each side in seconds.
  next_diff = next_t - time
  previous_diff = time - previous_t

  if next_diff == previous_diff
    # In the event `time` is exactly between `previous_t` and `next_t` the diff will not be equal to
    # `cron.rough_frequency`. In that case we round down.
    cron.rough_frequency == next_diff ? time : previous_t
  elsif next_diff > previous_diff
    # We are closer to the previous run time so return that.
    previous_t
  else
    # We are closer to the next run time so return that.
    next_t
  end
end

.enqueue_with_active_job(config) ⇒ Object

Enqueues the job using the ActiveJob.

Parameters:

  • config (Hash)

    The job configuration



94
95
96
97
98
99
100
# File 'lib/sidekiq-scheduler/utils.rb', line 94

def self.enqueue_with_active_job(config)
  options = {
    queue: config['queue']
  }.keep_if { |_, v| !v.nil? }

  initialize_active_job(config['class'], config['args'], config['keyword_argument']).enqueue(options)
end

.enqueue_with_sidekiq(config) ⇒ Object

Enqueues the job using the Sidekiq client.

Parameters:

  • config (Hash)

    The job configuration



87
88
89
# File 'lib/sidekiq-scheduler/utils.rb', line 87

def self.enqueue_with_sidekiq(config)
  Sidekiq::Client.push(sanitize_job_config(config))
end

.initialize_active_job(klass, args, keyword_argument = false) ⇒ Object

Initializes active_job using the passed parameters.

Parameters:

  • klass (Class)

    The class to initialize

  • args (Array, Hash)

    The parameters passed to the klass initializer

Returns:

  • (Object)

    instance of the class klass



63
64
65
66
67
68
69
70
71
# File 'lib/sidekiq-scheduler/utils.rb', line 63

def self.initialize_active_job(klass, args, keyword_argument = false)
  if args.is_a?(Array)
    klass.new(*args)
  elsif args.is_a?(Hash) && keyword_argument
    klass.new(**symbolize_keys(args))
  else
    klass.new(args)
  end
end

.new_rufus_scheduler(options = {}) ⇒ Rufus::Scheduler

Creates a new instance of rufus scheduler.

Returns:

  • (Rufus::Scheduler)

    the scheduler instance



114
115
116
117
118
119
120
121
122
123
# File 'lib/sidekiq-scheduler/utils.rb', line 114

def self.new_rufus_scheduler(options = {})
  Rufus::Scheduler.new(options).tap do |scheduler|
    scheduler.define_singleton_method(:on_post_trigger) do |job, triggered_time|
      if (job_name = job.tags[0])
        SidekiqScheduler::Utils.update_job_last_time(job_name, triggered_time)
        SidekiqScheduler::Utils.update_job_next_time(job_name, job.next_time)
      end
    end
  end
end

.sanitize_job_config(config) ⇒ Hash

Removes the hash values associated to the rufus metadata keys.

Parameters:

  • config (Hash)

    The job configuration

Returns:

  • (Hash)

    the sanitized job config



107
108
109
# File 'lib/sidekiq-scheduler/utils.rb', line 107

def self.sanitize_job_config(config)
  config.reject { |k, _| RUFUS_METADATA_KEYS.include?(k) }
end

.stringify_keys(object) ⇒ Object

Stringify keys belonging to a hash.

Also stringifies nested keys and keys of hashes inside arrays, and sets

Parameters:

  • object (Object)

Returns:

  • (Object)


15
16
17
18
19
20
21
22
23
24
25
# File 'lib/sidekiq-scheduler/utils.rb', line 15

def self.stringify_keys(object)
  if object.is_a?(Hash)
    Hash[[*object.map { |k, v| [k.to_s, stringify_keys(v) ]} ]]

  elsif object.is_a?(Array) || object.is_a?(Set)
    object.map { |v| stringify_keys(v) }

  else
    object
  end
end

.symbolize_keys(object) ⇒ Object

Symbolize keys belonging to a hash.

Also symbolizes nested keys and keys of hashes inside arrays, and sets

Parameters:

  • object (Object)

Returns:

  • (Object)


34
35
36
37
38
39
40
41
42
43
44
# File 'lib/sidekiq-scheduler/utils.rb', line 34

def self.symbolize_keys(object)
  if object.is_a?(Hash)
    Hash[[*object.map { |k, v| [k.to_sym, symbolize_keys(v) ]} ]]

  elsif object.is_a?(Array) || object.is_a?(Set)
    object.map { |v| symbolize_keys(v) }

  else
    object
  end
end

.try_to_constantize(klass) ⇒ Class

Constantize a given string.

Parameters:

  • klass (String)

    The string to constantize

Returns:

  • (Class)

    the class corresponding to the klass param



51
52
53
54
55
# File 'lib/sidekiq-scheduler/utils.rb', line 51

def self.try_to_constantize(klass)
  klass.is_a?(String) ? Object.const_get(klass) : klass
rescue NameError
  klass
end

.update_job_last_time(name, last_time) ⇒ Object

Pushes job’s last execution time

Parameters:

  • name (String)

    The job’s name

  • last_time (Time)

    The job’s last execution time



141
142
143
# File 'lib/sidekiq-scheduler/utils.rb', line 141

def self.update_job_last_time(name, last_time)
  SidekiqScheduler::RedisManager.set_job_last_time(name, last_time) if last_time
end

.update_job_next_time(name, next_time) ⇒ Object

Pushes job’s next time execution

Parameters:

  • name (String)

    The job’s name

  • next_time (Time)

    The job’s next time execution



129
130
131
132
133
134
135
# File 'lib/sidekiq-scheduler/utils.rb', line 129

def self.update_job_next_time(name, next_time)
  if next_time
    SidekiqScheduler::RedisManager.set_job_next_time(name, next_time)
  else
    SidekiqScheduler::RedisManager.remove_job_next_time(name)
  end
end