Module: SidekiqScheduler::Utils
- Defined in:
- lib/sidekiq-scheduler/utils.rb
Constant Summary collapse
- RUFUS_METADATA_KEYS =
%w(description at cron every in interval enabled)
Class Method Summary collapse
-
.active_job_enqueue?(klass) ⇒ Boolean
Returns true if the enqueuing needs to be done for an ActiveJob class false otherwise.
-
.calc_cron_run_time(cron, time) ⇒ Time
Try to figure out when the cron job was supposed to run.
-
.enqueue_with_active_job(config) ⇒ Object
Enqueues the job using the ActiveJob.
-
.enqueue_with_sidekiq(config) ⇒ Object
Enqueues the job using the Sidekiq client.
-
.initialize_active_job(klass, args, keyword_argument = false) ⇒ Object
Initializes active_job using the passed parameters.
-
.new_rufus_scheduler(options = {}) ⇒ Rufus::Scheduler
Creates a new instance of rufus scheduler.
-
.sanitize_job_config(config) ⇒ Hash
Removes the hash values associated to the rufus metadata keys.
-
.stringify_keys(object) ⇒ Object
Stringify keys belonging to a hash.
-
.symbolize_keys(object) ⇒ Object
Symbolize keys belonging to a hash.
-
.try_to_constantize(klass) ⇒ Class
Constantize a given string.
-
.update_job_last_time(name, last_time) ⇒ Object
Pushes job’s last execution time.
-
.update_job_next_time(name, next_time) ⇒ Object
Pushes job’s next time execution.
Class Method Details
.active_job_enqueue?(klass) ⇒ Boolean
Returns true if the enqueuing needs to be done for an ActiveJob
class false otherwise.
79 80 81 82 |
# File 'lib/sidekiq-scheduler/utils.rb', line 79 def self.active_job_enqueue?(klass) klass.is_a?(Class) && defined?(ActiveJob::Enqueuing) && klass.included_modules.include?(ActiveJob::Enqueuing) end |
.calc_cron_run_time(cron, time) ⇒ Time
Try to figure out when the cron job was supposed to run.
Rufus calls the scheduler block with the current time and not the time the block was scheduled to run. This means under certain conditions you could have a job get scheduled multiple times because ‘time.to_i` is used to key the job in redis. If one server is under load and Rufus tries to run the jobs 1 seconds after the other server then the job will be queued twice. This method essentially makes a best guess at when this job was supposed to run and return that.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/sidekiq-scheduler/utils.rb', line 157 def self.calc_cron_run_time(cron, time) time = time.floor # remove sub seconds to prevent rounding errors. return time if cron.match?(time) # If the time is a perfect match then return it. next_t = cron.next_time(time).to_t previous_t = cron.previous_time(time).to_t # The `time` var is some point between `previous_t` and `next_t`. # Figure out how far off we are from each side in seconds. next_diff = next_t - time previous_diff = time - previous_t if next_diff == previous_diff # In the event `time` is exactly between `previous_t` and `next_t` the diff will not be equal to # `cron.rough_frequency`. In that case we round down. cron.rough_frequency == next_diff ? time : previous_t elsif next_diff > previous_diff # We are closer to the previous run time so return that. previous_t else # We are closer to the next run time so return that. next_t end end |
.enqueue_with_active_job(config) ⇒ Object
Enqueues the job using the ActiveJob.
94 95 96 97 98 99 100 |
# File 'lib/sidekiq-scheduler/utils.rb', line 94 def self.enqueue_with_active_job(config) = { queue: config['queue'] }.keep_if { |_, v| !v.nil? } initialize_active_job(config['class'], config['args'], config['keyword_argument']).enqueue() end |
.enqueue_with_sidekiq(config) ⇒ Object
Enqueues the job using the Sidekiq client.
87 88 89 |
# File 'lib/sidekiq-scheduler/utils.rb', line 87 def self.enqueue_with_sidekiq(config) Sidekiq::Client.push(sanitize_job_config(config)) end |
.initialize_active_job(klass, args, keyword_argument = false) ⇒ Object
Initializes active_job using the passed parameters.
63 64 65 66 67 68 69 70 71 |
# File 'lib/sidekiq-scheduler/utils.rb', line 63 def self.initialize_active_job(klass, args, keyword_argument = false) if args.is_a?(Array) klass.new(*args) elsif args.is_a?(Hash) && keyword_argument klass.new(**symbolize_keys(args)) else klass.new(args) end end |
.new_rufus_scheduler(options = {}) ⇒ Rufus::Scheduler
Creates a new instance of rufus scheduler.
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/sidekiq-scheduler/utils.rb', line 114 def self.new_rufus_scheduler( = {}) Rufus::Scheduler.new().tap do |scheduler| scheduler.define_singleton_method(:on_post_trigger) do |job, triggered_time| if (job_name = job.[0]) SidekiqScheduler::Utils.update_job_last_time(job_name, triggered_time) SidekiqScheduler::Utils.update_job_next_time(job_name, job.next_time) end end end end |
.sanitize_job_config(config) ⇒ Hash
Removes the hash values associated to the rufus metadata keys.
107 108 109 |
# File 'lib/sidekiq-scheduler/utils.rb', line 107 def self.sanitize_job_config(config) config.reject { |k, _| RUFUS_METADATA_KEYS.include?(k) } end |
.stringify_keys(object) ⇒ Object
Stringify keys belonging to a hash.
Also stringifies nested keys and keys of hashes inside arrays, and sets
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/sidekiq-scheduler/utils.rb', line 15 def self.stringify_keys(object) if object.is_a?(Hash) Hash[[*object.map { |k, v| [k.to_s, stringify_keys(v) ]} ]] elsif object.is_a?(Array) || object.is_a?(Set) object.map { |v| stringify_keys(v) } else object end end |
.symbolize_keys(object) ⇒ Object
Symbolize keys belonging to a hash.
Also symbolizes nested keys and keys of hashes inside arrays, and sets
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/sidekiq-scheduler/utils.rb', line 34 def self.symbolize_keys(object) if object.is_a?(Hash) Hash[[*object.map { |k, v| [k.to_sym, symbolize_keys(v) ]} ]] elsif object.is_a?(Array) || object.is_a?(Set) object.map { |v| symbolize_keys(v) } else object end end |
.try_to_constantize(klass) ⇒ Class
Constantize a given string.
51 52 53 54 55 |
# File 'lib/sidekiq-scheduler/utils.rb', line 51 def self.try_to_constantize(klass) klass.is_a?(String) ? Object.const_get(klass) : klass rescue NameError klass end |
.update_job_last_time(name, last_time) ⇒ Object
Pushes job’s last execution time
141 142 143 |
# File 'lib/sidekiq-scheduler/utils.rb', line 141 def self.update_job_last_time(name, last_time) SidekiqScheduler::RedisManager.set_job_last_time(name, last_time) if last_time end |
.update_job_next_time(name, next_time) ⇒ Object
Pushes job’s next time execution
129 130 131 132 133 134 135 |
# File 'lib/sidekiq-scheduler/utils.rb', line 129 def self.update_job_next_time(name, next_time) if next_time SidekiqScheduler::RedisManager.set_job_next_time(name, next_time) else SidekiqScheduler::RedisManager.remove_job_next_time(name) end end |