Class: Bizside::JobUtils
- Inherits:
-
Object
- Object
- Bizside::JobUtils
- Defined in:
- lib/bizside/job_utils.rb
Class Method Summary collapse
- .add_cron(name, job_type, cron, *args) ⇒ Object
- .add_cron_to(queue, name, job_type, cron, *args) ⇒ Object
- .add_job(klass, *args, &block) ⇒ Object
- .add_job_silently(klass, *args) ⇒ Object
- .add_job_silently_to(queue, klass, *args) ⇒ Object
- .add_job_to(queue, klass, *args) ⇒ Object
- .add_scheduler(name, job_type, interval_args, *args) ⇒ Object
- .any_jobs_for?(queue) ⇒ Boolean
- .cancel_job_at(klass, *args) ⇒ Object
-
.delayed?(klass, args, except: []) ⇒ Boolean
Resque.delayed? は @queue の定義がない場合に機能しないため、独自に実装.
- .delayed_queue_peek(start, count) ⇒ Object
- .delayed_queue_schedule_size ⇒ Object
- .delayed_timestamp_peek(timestamp, start, count) ⇒ Object
- .delayed_timestamp_size(timestamp) ⇒ Object
- .dequeue(klass, *args) ⇒ Object
- .enqueue_at_with_queue(queue, time, klass, *args) ⇒ Object
- .enqueue_at_with_queue_silently(queue, time, klass, *args) ⇒ Object
- .enqueue_in(time_to_delay, klass, *args) ⇒ Object
- .enqueue_in_with_queue(queue, time_to_delay, klass, *args) ⇒ Object
- .failure_count(queue = nil, class_name = nil) ⇒ Object
- .failure_jobs(start = 0, count = 1, queue = nil) ⇒ Object
- .peek(queue, start = 0, count = 1) ⇒ Object
- .queue_from_class(klass) ⇒ Object
- .queue_size(queue) ⇒ Object
- .remove_cron(name) ⇒ Object
- .remove_delayed_in_queue(klass, queue, *args) ⇒ Object
- .remove_queue(queue) ⇒ Object
- .remove_scheduler(name) ⇒ Object
- .set_cron_options(cron) ⇒ Object
- .set_job_at(time, klass, *args) ⇒ Object
- .set_job_silently_at(time, klass, *args) ⇒ Object
- .unique_in_queue?(klass, args = {}, queue, count: 100, except: []) ⇒ Boolean
Class Method Details
.add_cron(name, job_type, cron, *args) ⇒ Object
159 160 161 |
# File 'lib/bizside/job_utils.rb', line 159 def self.add_cron(name, job_type, cron, *args) add_cron_to(nil, name, job_type, cron, *args) end |
.add_cron_to(queue, name, job_type, cron, *args) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/bizside/job_utils.rb', line 163 def self.add_cron_to(queue, name, job_type, cron, *args) if Bizside.rails_env&.test? Bizside.logger.info 'テスト時にはCronの設定を行いません。' return end ::Resque.remove_schedule(name) cronline = Array(cron).first if cronline.to_s.strip.empty? return elsif CronValidator.new(cronline).valid? new_cron = (cron) config = { :class => job_type, :cron => new_cron, :args => args, :persist => true } config[:queue] = queue if queue.present? ::Resque.set_schedule(name, config) else raise ArgumentError, "Cronの書式が正しくないのでスケジューリングしません。name=#{name}" end end |
.add_job(klass, *args, &block) ⇒ Object
6 7 8 |
# File 'lib/bizside/job_utils.rb', line 6 def self.add_job(klass, *args, &block) add_job_to(nil, klass, *args, &block) end |
.add_job_silently(klass, *args) ⇒ Object
33 34 35 36 37 |
# File 'lib/bizside/job_utils.rb', line 33 def self.add_job_silently(klass, *args) add_job(klass, *args) do # 何も出力しない end end |
.add_job_silently_to(queue, klass, *args) ⇒ Object
39 40 41 42 43 |
# File 'lib/bizside/job_utils.rb', line 39 def self.add_job_silently_to(queue, klass, *args) add_job_to(queue, klass, *args) do # 何も出力しない end end |
.add_job_to(queue, klass, *args) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/bizside/job_utils.rb', line 10 def self.add_job_to(queue, klass, *args) if Bizside.rails_env&.test? do_perform_and_hooks_instantly(klass, 'テスト時にはジョブの登録を行わず、即時実行します。', *args) return end if block_given? yield else if queue Bizside.logger.info "ジョブ #{klass} を #{queue} に登録します。" else Bizside.logger.info "ジョブ #{klass} を登録します。" end end if queue.present? ::Resque.enqueue_to(queue, klass, *args) else ::Resque.enqueue(klass, *args) end end |
.add_scheduler(name, job_type, interval_args, *args) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/bizside/job_utils.rb', line 197 def self.add_scheduler(name, job_type, interval_args, *args) if Bizside.rails_env&.test? Bizside.logger.info 'テスト時にはCronの設定を行いません。' return end ::Resque.remove_schedule(name) if interval_args[:cron].present? add_cron(name, job_type, interval_args[:cron], args) elsif interval_args[:every].present? config = { :class => job_type, :every => interval_args[:every], :args => args, :persist => true } ::Resque.set_schedule(name, config) else raise ArgumentError, "cronもしくはeveryが指定されていないのでスケジューリングしません。name=#{name}" end end |
.any_jobs_for?(queue) ⇒ Boolean
269 270 271 272 273 |
# File 'lib/bizside/job_utils.rb', line 269 def self.any_jobs_for?(queue) ret = ::Resque.size(queue) ret += ::Resque.working.reduce(0){|sum, worker| sum += worker.queues.include?(queue) ? 1 : 0 } ret > 0 end |
.cancel_job_at(klass, *args) ⇒ Object
150 151 152 153 154 155 156 157 |
# File 'lib/bizside/job_utils.rb', line 150 def self.cancel_job_at(klass, *args) if Bizside.rails_env&.test? Bizside.logger.info "テスト時には遅延ジョブのキャンセルを行いません。" return end ::Resque.remove_delayed(klass, *args) end |
.delayed?(klass, args, except: []) ⇒ Boolean
Resque.delayed? は @queue の定義がない場合に機能しないため、独自に実装
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/bizside/job_utils.rb', line 112 def self.delayed?(klass, args, except: []) args_to_check = args.with_indifferent_access.except(*Array(except)) JobUtils.delayed_queue_peek(0, JobUtils.delayed_queue_schedule_size).each do || JobUtils.(, 0, JobUtils.()).each do |job_info| job_args = job_info['args'].first.presence || {} job_args = job_args.with_indifferent_access.except(*Array(except)) if job_info['class'] == klass.to_s && job_args == args_to_check Rails.logger.info "遅延ジョブに #{job_info} がすでに登録されています。" return true end end end false end |
.delayed_queue_peek(start, count) ⇒ Object
66 67 68 69 70 71 72 73 |
# File 'lib/bizside/job_utils.rb', line 66 def self.delayed_queue_peek(start, count) if Bizside.rails_env&.test? Rails.logger.info "テスト時には遅延ジョブの一覧を返しません。" return [] end ::Resque.delayed_queue_peek(start, count) end |
.delayed_queue_schedule_size ⇒ Object
75 76 77 78 79 80 81 82 |
# File 'lib/bizside/job_utils.rb', line 75 def self.delayed_queue_schedule_size if Bizside.rails_env&.test? Rails.logger.info "テスト時には遅延ジョブの数を返しません。" return 0 end ::Resque.delayed_queue_schedule_size end |
.delayed_timestamp_peek(timestamp, start, count) ⇒ Object
84 85 86 87 88 89 90 91 |
# File 'lib/bizside/job_utils.rb', line 84 def self.(, start, count) if Bizside.rails_env&.test? Rails.logger.info "テスト時には指定された時間の遅延ジョブの一覧を返しません。" return [] end ::Resque.(, start, count) end |
.delayed_timestamp_size(timestamp) ⇒ Object
93 94 95 96 97 98 99 100 |
# File 'lib/bizside/job_utils.rb', line 93 def self.() if Bizside.rails_env&.test? Rails.logger.info "テスト時には指定された時間の遅延ジョブの数を返しません。" return 0 end ::Resque.() end |
.dequeue(klass, *args) ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/bizside/job_utils.rb', line 256 def self.dequeue(klass, *args) if Bizside.rails_env&.test? if klass.respond_to?(:before_dequeue) return if klass.before_dequeue(*args) == false end Bizside.logger.info 'テスト時にジョブの削除は行いません。' klass.after_dequeue(*args) if klass.respond_to?(:after_dequeue) return end ::Resque.dequeue(klass, *args) end |
.enqueue_at_with_queue(queue, time, klass, *args) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/bizside/job_utils.rb', line 45 def self.enqueue_at_with_queue(queue, time, klass, *args) if Bizside.rails_env&.test? do_perform_and_hooks_instantly(klass, 'テスト時には遅延ジョブの登録を行わず、即時実行します。', *args) return end if block_given? yield else Bizside.logger.info "遅延ジョブ #{klass} を #{queue} に登録します。" end ::Resque.enqueue_at_with_queue(queue, time, klass, *args) end |
.enqueue_at_with_queue_silently(queue, time, klass, *args) ⇒ Object
60 61 62 63 64 |
# File 'lib/bizside/job_utils.rb', line 60 def self.enqueue_at_with_queue_silently(queue, time, klass, *args) enqueue_at_with_queue(queue, time, klass, *args) do # 何も出力しない end end |
.enqueue_in(time_to_delay, klass, *args) ⇒ Object
238 239 240 241 242 243 244 245 |
# File 'lib/bizside/job_utils.rb', line 238 def self.enqueue_in(time_to_delay, klass, *args) if Bizside.rails_env&.test? Bizside.logger.info 'テスト時にジョブの遅延実行は行いません。' return end ::Resque.enqueue_in(time_to_delay, klass, *args) end |
.enqueue_in_with_queue(queue, time_to_delay, klass, *args) ⇒ Object
247 248 249 250 251 252 253 254 |
# File 'lib/bizside/job_utils.rb', line 247 def self.enqueue_in_with_queue(queue, time_to_delay, klass, *args) if Bizside.rails_env&.test? Bizside.logger.info 'テスト時にジョブの遅延実行は行いません。' return end ::Resque.enqueue_in_with_queue(queue, time_to_delay, klass, *args) end |
.failure_count(queue = nil, class_name = nil) ⇒ Object
358 359 360 361 362 |
# File 'lib/bizside/job_utils.rb', line 358 def self.failure_count(queue = nil, class_name = nil) return 0 if Bizside.rails_env&.test? ::Resque::Failure.count(queue, class_name) end |
.failure_jobs(start = 0, count = 1, queue = nil) ⇒ Object
349 350 351 352 353 354 355 356 |
# File 'lib/bizside/job_utils.rb', line 349 def self.failure_jobs(start = 0, count = 1, queue = nil) if Bizside.rails_env&.test? Bizside.logger.info 'テスト時は、ジョブ 0 件とします。' return [] end ::Resque::Failure.all(start, count, queue) end |
.peek(queue, start = 0, count = 1) ⇒ Object
229 230 231 232 233 234 235 236 |
# File 'lib/bizside/job_utils.rb', line 229 def self.peek(queue, start = 0, count = 1) if Bizside.rails_env&.test? Bizside.logger.info 'テスト時は、ジョブ 0 件とします。' return [] end ::Resque.peek(queue, start, count) end |
.queue_from_class(klass) ⇒ Object
284 285 286 |
# File 'lib/bizside/job_utils.rb', line 284 def self.queue_from_class(klass) ::Resque.queue_from_class(klass) end |
.queue_size(queue) ⇒ Object
288 289 290 291 292 |
# File 'lib/bizside/job_utils.rb', line 288 def self.queue_size(queue) return 0 if Bizside.rails_env&.test? ::Resque.size(queue) end |
.remove_cron(name) ⇒ Object
193 194 195 |
# File 'lib/bizside/job_utils.rb', line 193 def self.remove_cron(name) remove_scheduler(name) end |
.remove_delayed_in_queue(klass, queue, *args) ⇒ Object
102 103 104 105 106 107 108 109 |
# File 'lib/bizside/job_utils.rb', line 102 def self.remove_delayed_in_queue(klass, queue, *args) if Bizside.rails_env&.test? Rails.logger.info "テスト時には遅延ジョブのキャンセルを行いません。" return end ::Resque.remove_delayed_in_queue(klass, queue, *args) end |
.remove_queue(queue) ⇒ Object
275 276 277 278 279 280 281 282 |
# File 'lib/bizside/job_utils.rb', line 275 def self.remove_queue(queue) if Bizside.rails_env&.test? Bizside.logger.info 'テスト時にキューの削除は行いません。' return end ::Resque.remove_queue(queue) end |
.remove_scheduler(name) ⇒ Object
220 221 222 223 224 225 226 227 |
# File 'lib/bizside/job_utils.rb', line 220 def self.remove_scheduler(name) if Bizside.rails_env&.test? Bizside.logger.info 'テスト時にはCronの設定を行いません。' return end ::Resque.remove_schedule(name) end |
.set_cron_options(cron) ⇒ Object
409 410 411 412 413 414 415 416 417 418 419 420 421 |
# File 'lib/bizside/job_utils.rb', line 409 def self.(cron) ret = Array(cron) if ret.size > 1 opts = ret.second opts ||= {} opts = opts.with_indifferent_access opts = opts.merge(blocking: true) else opts = { blocking: true }.with_indifferent_access end ret[1] = opts ret end |
.set_job_at(time, klass, *args) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/bizside/job_utils.rb', line 129 def self.set_job_at(time, klass, *args) if Bizside.rails_env&.test? do_perform_and_hooks_instantly(klass, 'テスト時には遅延ジョブの登録を行わず、即時実行します。', *args) return end if block_given? yield else Bizside.logger.info "遅延ジョブ #{klass} を登録します。" end ::Resque.enqueue_at(time, klass, *args) end |
.set_job_silently_at(time, klass, *args) ⇒ Object
144 145 146 147 148 |
# File 'lib/bizside/job_utils.rb', line 144 def self.set_job_silently_at(time, klass, *args) set_job_at(time, klass, *args) do # 何も出力しない end end |
.unique_in_queue?(klass, args = {}, queue, count: 100, except: []) ⇒ Boolean
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/bizside/job_utils.rb', line 294 def self.unique_in_queue?(klass, args = {}, queue, count: 100, except: []) if Bizside.rails_env&.test? Bizside.logger.info "テスト時は常にキューに同一ジョブが存在しない前提とします。" return true end jobs = self.peek(queue, 0, count) # 先頭からcount件のジョブ if already_in_jobs?(klass, args, jobs, except: except) false else count = rest_count(self.queue_size(queue), count) if count == 0 true else count += 1 if count == 1 # Resque.peek()が1件だと戻り値の型が違うのを回避 jobs = self.peek(queue, count * -1, count) # 後ろからcount件のジョブ ! already_in_jobs?(klass, args, jobs, except: except) end end end |