Class: Bizside::JobUtils

Inherits:
Object
  • Object
show all
Defined in:
lib/bizside/job_utils.rb

Class Method Summary collapse

Class Method Details

.add_cron(name, job_type, cron, *args) ⇒ Object



159
160
161
# File 'lib/bizside/job_utils.rb', line 159

def self.add_cron(name, job_type, cron, *args)
  add_cron_to(nil, name, job_type, cron, *args)
end

.add_cron_to(queue, name, job_type, cron, *args) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/bizside/job_utils.rb', line 163

def self.add_cron_to(queue, name, job_type, cron, *args)
  if Bizside.rails_env&.test?
    Bizside.logger.info 'テスト時にはCronの設定を行いません。'
    return
  end

  ::Resque.remove_schedule(name)

  cronline = Array(cron).first

  if cronline.to_s.strip.empty?
    return
  elsif CronValidator.new(cronline).valid?
    new_cron = set_cron_options(cron)

    config = {
      :class => job_type,
      :cron => new_cron,
      :args => args,
      :persist => true
    }

    config[:queue] = queue if queue.present?

    ::Resque.set_schedule(name, config)
  else
    raise ArgumentError, "Cronの書式が正しくないのでスケジューリングしません。name=#{name}"
  end
end

.add_job(klass, *args, &block) ⇒ Object



6
7
8
# File 'lib/bizside/job_utils.rb', line 6

def self.add_job(klass, *args, &block)
  add_job_to(nil, klass, *args, &block)
end

.add_job_silently(klass, *args) ⇒ Object



33
34
35
36
37
# File 'lib/bizside/job_utils.rb', line 33

def self.add_job_silently(klass, *args)
  add_job(klass, *args) do
    # 何も出力しない
  end
end

.add_job_silently_to(queue, klass, *args) ⇒ Object



39
40
41
42
43
# File 'lib/bizside/job_utils.rb', line 39

def self.add_job_silently_to(queue, klass, *args)
  add_job_to(queue, klass, *args) do
    # 何も出力しない
  end
end

.add_job_to(queue, klass, *args) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/bizside/job_utils.rb', line 10

def self.add_job_to(queue, klass, *args)
  if Bizside.rails_env&.test?
    do_perform_and_hooks_instantly(klass, 'テスト時にはジョブの登録を行わず、即時実行します。', *args)
    return
  end

  if block_given?
    yield
  else
    if queue
      Bizside.logger.info "ジョブ #{klass}#{queue} に登録します。"
    else
      Bizside.logger.info "ジョブ #{klass} を登録します。"
    end
  end

  if queue.present?
    ::Resque.enqueue_to(queue, klass, *args)
  else
    ::Resque.enqueue(klass, *args)
  end
end

.add_scheduler(name, job_type, interval_args, *args) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/bizside/job_utils.rb', line 197

def self.add_scheduler(name, job_type, interval_args, *args)
  if Bizside.rails_env&.test?
    Bizside.logger.info 'テスト時にはCronの設定を行いません。'
    return
  end

  ::Resque.remove_schedule(name)

  if interval_args[:cron].present?
    add_cron(name, job_type, interval_args[:cron], args)
  elsif interval_args[:every].present?
    config = {
      :class => job_type,
      :every => interval_args[:every],
      :args => args,
      :persist => true
    }
    ::Resque.set_schedule(name, config)
  else
    raise ArgumentError, "cronもしくはeveryが指定されていないのでスケジューリングしません。name=#{name}"
  end
end

.any_jobs_for?(queue) ⇒ Boolean

Returns:

  • (Boolean)


269
270
271
272
273
# File 'lib/bizside/job_utils.rb', line 269

def self.any_jobs_for?(queue)
  ret = ::Resque.size(queue)
  ret += ::Resque.working.reduce(0){|sum, worker| sum += worker.queues.include?(queue) ? 1 : 0 }
  ret > 0
end

.cancel_job_at(klass, *args) ⇒ Object



150
151
152
153
154
155
156
157
# File 'lib/bizside/job_utils.rb', line 150

def self.cancel_job_at(klass, *args)
  if Bizside.rails_env&.test?
    Bizside.logger.info "テスト時には遅延ジョブのキャンセルを行いません。"
    return
  end

  ::Resque.remove_delayed(klass, *args)
end

.delayed?(klass, args, except: []) ⇒ Boolean

Resque.delayed? は @queue の定義がない場合に機能しないため、独自に実装

Returns:

  • (Boolean)


112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/bizside/job_utils.rb', line 112

def self.delayed?(klass, args, except: [])
  args_to_check = args.with_indifferent_access.except(*Array(except))

  JobUtils.delayed_queue_peek(0, JobUtils.delayed_queue_schedule_size).each do |timestamp|
    JobUtils.delayed_timestamp_peek(timestamp, 0, JobUtils.delayed_timestamp_size(timestamp)).each do |job_info|
      job_args = job_info['args'].first.presence || {}
      job_args = job_args.with_indifferent_access.except(*Array(except))
      if job_info['class'] == klass.to_s && job_args == args_to_check
        Rails.logger.info "遅延ジョブに #{job_info} がすでに登録されています。"
        return true
      end
    end
  end

  false
end

.delayed_queue_peek(start, count) ⇒ Object



66
67
68
69
70
71
72
73
# File 'lib/bizside/job_utils.rb', line 66

def self.delayed_queue_peek(start, count)
  if Bizside.rails_env&.test?
    Rails.logger.info "テスト時には遅延ジョブの一覧を返しません。"
    return []
  end

  ::Resque.delayed_queue_peek(start, count)
end

.delayed_queue_schedule_sizeObject



75
76
77
78
79
80
81
82
# File 'lib/bizside/job_utils.rb', line 75

def self.delayed_queue_schedule_size
  if Bizside.rails_env&.test?
    Rails.logger.info "テスト時には遅延ジョブの数を返しません。"
    return 0
  end

  ::Resque.delayed_queue_schedule_size
end

.delayed_timestamp_peek(timestamp, start, count) ⇒ Object



84
85
86
87
88
89
90
91
# File 'lib/bizside/job_utils.rb', line 84

def self.delayed_timestamp_peek(timestamp, start, count)
  if Bizside.rails_env&.test?
    Rails.logger.info "テスト時には指定された時間の遅延ジョブの一覧を返しません。"
    return []
  end

  ::Resque.delayed_timestamp_peek(timestamp, start, count)
end

.delayed_timestamp_size(timestamp) ⇒ Object



93
94
95
96
97
98
99
100
# File 'lib/bizside/job_utils.rb', line 93

def self.delayed_timestamp_size(timestamp)
  if Bizside.rails_env&.test?
    Rails.logger.info "テスト時には指定された時間の遅延ジョブの数を返しません。"
    return 0
  end

  ::Resque.delayed_timestamp_size(timestamp)
end

.dequeue(klass, *args) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/bizside/job_utils.rb', line 256

def self.dequeue(klass, *args)
  if Bizside.rails_env&.test?
    if klass.respond_to?(:before_dequeue)
      return if klass.before_dequeue(*args) == false
    end
    Bizside.logger.info 'テスト時にジョブの削除は行いません。'
    klass.after_dequeue(*args) if klass.respond_to?(:after_dequeue)
    return
  end

  ::Resque.dequeue(klass, *args)
end

.enqueue_at_with_queue(queue, time, klass, *args) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/bizside/job_utils.rb', line 45

def self.enqueue_at_with_queue(queue, time, klass, *args)
  if Bizside.rails_env&.test?
    do_perform_and_hooks_instantly(klass, 'テスト時には遅延ジョブの登録を行わず、即時実行します。', *args)
    return
  end

  if block_given?
    yield
  else
    Bizside.logger.info "遅延ジョブ #{klass}#{queue} に登録します。"
  end

  ::Resque.enqueue_at_with_queue(queue, time, klass, *args)
end

.enqueue_at_with_queue_silently(queue, time, klass, *args) ⇒ Object



60
61
62
63
64
# File 'lib/bizside/job_utils.rb', line 60

def self.enqueue_at_with_queue_silently(queue, time, klass, *args)
  enqueue_at_with_queue(queue, time, klass, *args) do
    # 何も出力しない
  end
end

.enqueue_in(time_to_delay, klass, *args) ⇒ Object



238
239
240
241
242
243
244
245
# File 'lib/bizside/job_utils.rb', line 238

def self.enqueue_in(time_to_delay, klass, *args)
  if Bizside.rails_env&.test?
    Bizside.logger.info 'テスト時にジョブの遅延実行は行いません。'
    return
  end

  ::Resque.enqueue_in(time_to_delay, klass, *args)
end

.enqueue_in_with_queue(queue, time_to_delay, klass, *args) ⇒ Object



247
248
249
250
251
252
253
254
# File 'lib/bizside/job_utils.rb', line 247

def self.enqueue_in_with_queue(queue, time_to_delay, klass, *args)
  if Bizside.rails_env&.test?
    Bizside.logger.info 'テスト時にジョブの遅延実行は行いません。'
    return
  end

  ::Resque.enqueue_in_with_queue(queue, time_to_delay, klass, *args)
end

.failure_count(queue = nil, class_name = nil) ⇒ Object



358
359
360
361
362
# File 'lib/bizside/job_utils.rb', line 358

def self.failure_count(queue = nil, class_name = nil)
  return 0 if Bizside.rails_env&.test?

  ::Resque::Failure.count(queue, class_name)
end

.failure_jobs(start = 0, count = 1, queue = nil) ⇒ Object



349
350
351
352
353
354
355
356
# File 'lib/bizside/job_utils.rb', line 349

def self.failure_jobs(start = 0, count = 1, queue = nil)
  if Bizside.rails_env&.test?
    Bizside.logger.info 'テスト時は、ジョブ 0 件とします。'
    return []
  end

  ::Resque::Failure.all(start, count, queue)
end

.peek(queue, start = 0, count = 1) ⇒ Object



229
230
231
232
233
234
235
236
# File 'lib/bizside/job_utils.rb', line 229

def self.peek(queue, start = 0, count = 1)
  if Bizside.rails_env&.test?
    Bizside.logger.info 'テスト時は、ジョブ 0 件とします。'
    return []
  end

  ::Resque.peek(queue, start, count)
end

.queue_from_class(klass) ⇒ Object



284
285
286
# File 'lib/bizside/job_utils.rb', line 284

def self.queue_from_class(klass)
  ::Resque.queue_from_class(klass)
end

.queue_size(queue) ⇒ Object



288
289
290
291
292
# File 'lib/bizside/job_utils.rb', line 288

def self.queue_size(queue)
  return 0 if Bizside.rails_env&.test?

  ::Resque.size(queue)
end

.remove_cron(name) ⇒ Object



193
194
195
# File 'lib/bizside/job_utils.rb', line 193

def self.remove_cron(name)
  remove_scheduler(name)
end

.remove_delayed_in_queue(klass, queue, *args) ⇒ Object



102
103
104
105
106
107
108
109
# File 'lib/bizside/job_utils.rb', line 102

def self.remove_delayed_in_queue(klass, queue, *args)
  if Bizside.rails_env&.test?
    Rails.logger.info "テスト時には遅延ジョブのキャンセルを行いません。"
    return
  end

  ::Resque.remove_delayed_in_queue(klass, queue, *args)
end

.remove_queue(queue) ⇒ Object



275
276
277
278
279
280
281
282
# File 'lib/bizside/job_utils.rb', line 275

def self.remove_queue(queue)
  if Bizside.rails_env&.test?
    Bizside.logger.info 'テスト時にキューの削除は行いません。'
    return
  end

  ::Resque.remove_queue(queue)
end

.remove_scheduler(name) ⇒ Object



220
221
222
223
224
225
226
227
# File 'lib/bizside/job_utils.rb', line 220

def self.remove_scheduler(name)
  if Bizside.rails_env&.test?
    Bizside.logger.info 'テスト時にはCronの設定を行いません。'
    return
  end

  ::Resque.remove_schedule(name)
end

.set_cron_options(cron) ⇒ Object



409
410
411
412
413
414
415
416
417
418
419
420
421
# File 'lib/bizside/job_utils.rb', line 409

def self.set_cron_options(cron)
  ret = Array(cron)
  if ret.size > 1
    opts = ret.second
    opts ||= {}
    opts = opts.with_indifferent_access
    opts = opts.merge(blocking: true)
  else
    opts = { blocking: true }.with_indifferent_access
  end
  ret[1] = opts
  ret
end

.set_job_at(time, klass, *args) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/bizside/job_utils.rb', line 129

def self.set_job_at(time, klass, *args)
  if Bizside.rails_env&.test?
    do_perform_and_hooks_instantly(klass, 'テスト時には遅延ジョブの登録を行わず、即時実行します。', *args)
    return
  end

  if block_given?
    yield
  else
    Bizside.logger.info "遅延ジョブ #{klass} を登録します。"
  end

  ::Resque.enqueue_at(time, klass, *args)
end

.set_job_silently_at(time, klass, *args) ⇒ Object



144
145
146
147
148
# File 'lib/bizside/job_utils.rb', line 144

def self.set_job_silently_at(time, klass, *args)
  set_job_at(time, klass, *args) do
    # 何も出力しない
  end
end

.unique_in_queue?(klass, args = {}, queue, count: 100, except: []) ⇒ Boolean

Returns:

  • (Boolean)


294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/bizside/job_utils.rb', line 294

def self.unique_in_queue?(klass, args = {}, queue, count: 100, except: [])
  if Bizside.rails_env&.test?
    Bizside.logger.info "テスト時は常にキューに同一ジョブが存在しない前提とします。"
    return true
  end

  jobs = self.peek(queue, 0, count)  # 先頭からcount件のジョブ

  if already_in_jobs?(klass, args, jobs, except: except)
    false
  else
    count = rest_count(self.queue_size(queue), count)
    if count == 0
      true
    else
      count += 1 if count == 1  # Resque.peek()が1件だと戻り値の型が違うのを回避
      jobs = self.peek(queue, count * -1, count)  # 後ろからcount件のジョブ
      ! already_in_jobs?(klass, args, jobs, except: except)
    end
  end
end