Module: SidekiqBulkJob

Defined in:
lib/sidekiq_bulk_job.rb,
lib/sidekiq_bulk_job/utils.rb,
lib/sidekiq_bulk_job/monitor.rb,
lib/sidekiq_bulk_job/version.rb,
lib/sidekiq_bulk_job/bulk_job.rb,
lib/sidekiq_bulk_job/job_retry.rb,
lib/sidekiq_bulk_job/batch_runner.rb,
lib/sidekiq_bulk_job/scheduled_job.rb,
lib/sidekiq_bulk_job/bulk_error_handler.rb

Defined Under Namespace

Modules: BatchRunner, Utils Classes: BulkErrorHandler, BulkJob, Error, JobRetry, Monitor, ScheduledJob, Setter

Constant Summary collapse

VERSION =
"0.1.9"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.async_delayObject

Returns the value of attribute async_delay.



67
68
69
# File 'lib/sidekiq_bulk_job.rb', line 67

def async_delay
  @async_delay
end

.batch_sizeObject

Returns the value of attribute batch_size.



67
68
69
# File 'lib/sidekiq_bulk_job.rb', line 67

def batch_size
  @batch_size
end

.loggerObject

Returns the value of attribute logger.



67
68
69
# File 'lib/sidekiq_bulk_job.rb', line 67

def logger
  @logger
end

.prefixObject

Returns the value of attribute prefix.



67
68
69
# File 'lib/sidekiq_bulk_job.rb', line 67

def prefix
  @prefix
end

.process_failObject

Returns the value of attribute process_fail.



67
68
69
# File 'lib/sidekiq_bulk_job.rb', line 67

def process_fail
  @process_fail
end

.queueObject

Returns the value of attribute queue.



67
68
69
# File 'lib/sidekiq_bulk_job.rb', line 67

def queue
  @queue
end

.redisObject

Returns the value of attribute redis.



67
68
69
# File 'lib/sidekiq_bulk_job.rb', line 67

def redis
  @redis
end

.scheduled_delayObject

Returns the value of attribute scheduled_delay.



67
68
69
# File 'lib/sidekiq_bulk_job.rb', line 67

def scheduled_delay
  @scheduled_delay
end

Class Method Details

.bulk_run(job_class_name, key, queue: self.queue, async: true) ⇒ Object



202
203
204
205
206
# File 'lib/sidekiq_bulk_job.rb', line 202

def bulk_run(job_class_name, key, queue: self.queue, async: true)
  args_array = flush(key)
  return if args_array.nil? || args_array.empty?
  async ? SidekiqBulkJob::BulkJob.client_push("queue" => queue, "class" => SidekiqBulkJob::BulkJob, "args" => [job_class_name, args_array]) : SidekiqBulkJob::BulkJob.new.perform(job_class_name, args_array)
end

.clear(key) ⇒ Object



194
195
196
197
198
199
200
# File 'lib/sidekiq_bulk_job.rb', line 194

def clear(key)
  script = %Q{
    local size = redis.call('llen', KEYS[1])
    if size == 0 then redis.call('del', KEYS[1]) end
  }
  client.eval script, [key]
end

.clientObject



164
165
166
167
168
169
# File 'lib/sidekiq_bulk_job.rb', line 164

def client
  if redis.nil?
    raise ArgumentError.new("Please initialize redis first!")
  end
  redis
end

.config(redis:, logger:, process_fail:, async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob") ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/sidekiq_bulk_job.rb', line 69

def config(redis: , logger: , process_fail: , async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob")
  if redis.nil?
    raise ArgumentError.new("redis not allow nil")
  end
  if logger.nil?
    raise ArgumentError.new("logger not allow nil")
  end
  if process_fail.nil?
    raise ArgumentError.new("process_fail not allow nil")
  end
  if async_delay.to_f < 2
    raise ArgumentError.new("async_delay not allow less than 2 seconds.")
  elsif async_delay.to_f > 5 * 60
    raise ArgumentError.new("async_delay not allow greater than 5 minutes.")
  end
  if scheduled_delay.to_f < 2
    raise ArgumentError.new("scheduled_delay not allow less than 2 seconds.")
  elsif scheduled_delay.to_f > 30
    raise ArgumentError.new("scheduled_delay not allow greater than 2 seconds.")
  end

  self.redis = redis
  self.queue = queue
  self.batch_size = batch_size
  self.prefix = prefix
  self.logger = logger
  self.process_fail = process_fail
  self.async_delay = async_delay.to_f
  self.scheduled_delay = scheduled_delay.to_f
end

.fail_callback(job_class_name:, args:, exception:) ⇒ Object



223
224
225
# File 'lib/sidekiq_bulk_job.rb', line 223

def fail_callback(job_class_name: , args:, exception: )
  process_fail.call(job_class_name, args, exception)
end

.flush(key) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/sidekiq_bulk_job.rb', line 180

def flush(key)
  result = []
  begin
    _result, success = client.multi do |multi|
      multi.lrange(key, 0, batch_size)
      multi.ltrim(key, batch_size+1, -1)
    end
    result += _result
    count = client.llen key
  end while count > 0
  clear(key)
  result.reverse
end

.generate_key(job_class_name) ⇒ Object



160
161
162
# File 'lib/sidekiq_bulk_job.rb', line 160

def generate_key(job_class_name)
  "#{prefix}:#{job_class_name}"
end

.monitor(job_class_name, queue: self.queue) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/sidekiq_bulk_job.rb', line 208

def monitor(job_class_name, queue: self.queue)
  scheduled_set = Sidekiq::ScheduledSet.new
  _monitor = scheduled_set.find do |job|
    if job.klass == SidekiqBulkJob::Monitor.to_s
      timestamp, _job_class_name = job.args
      _job_class_name == job_class_name
    end
  end
  if !_monitor.nil?
    # TODO debug log
  else
    SidekiqBulkJob::Monitor.client_push("queue" => queue, "at" => (time_now + self.async_delay).to_f, "class" => SidekiqBulkJob::Monitor, "args" => [time_now.to_f, job_class_name])
  end
end

.need_flush?(key) ⇒ Boolean

Returns:

  • (Boolean)


175
176
177
178
# File 'lib/sidekiq_bulk_job.rb', line 175

def need_flush?(key)
  count = client.llen key
  return true if count >= batch_size
end

.perform_async(job_class, *perfrom_args) ⇒ Object

无法定义具体执行时间,相当于perform_async的批量执行 example:

SidekiqBulkJob.perform_async(SomeWorkerClass, *args)


108
109
110
111
# File 'lib/sidekiq_bulk_job.rb', line 108

def perform_async(job_class, *perfrom_args)
  process(job_class_name: job_class.to_s, perfrom_args: perfrom_args)
  nil
end

.perform_in(at, job_class, *perfrom_args) ⇒ Object Also known as: perform_at

延迟一段时间执行 example:

SidekiqBulkJob.perform_at(Date.parse("2020-12-01"), SomeWorkerClass, *args)


116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/sidekiq_bulk_job.rb', line 116

def perform_in(at, job_class, *perfrom_args)
  int = at.to_f
  now = time_now
  ts = (int < 1_000_000_000 ? now + int : int).to_f

  # Optimization to enqueue something now that is scheduled to go out now or in the past
  if ts <= now.to_f
    process(job_class_name: job_class.to_s, perfrom_args: perfrom_args)
  else
    process(at: ts, job_class_name: job_class.to_s, perfrom_args: perfrom_args)
  end
end

.process(job_class_name:, at: nil, perfrom_args: [], queue: self.queue) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/sidekiq_bulk_job.rb', line 132

def process(job_class_name: , at: nil, perfrom_args: [], queue: self.queue)
  if at.nil?
    key = generate_key(job_class_name)
    client.lpush key, SidekiqBulkJob::Utils.dump(perfrom_args)
    bulk_run(job_class_name, key, queue: queue) if need_flush?(key)
    monitor(job_class_name, queue: queue)
  else
    scheduled_set = Sidekiq::ScheduledSet.new
    args_redis_key = nil
    target = scheduled_set.find do |job|
      if job.klass == SidekiqBulkJob::ScheduledJob.to_s &&
            job.at.to_i.between?((at - self.scheduled_delay).to_i, (at + self.scheduled_delay).to_i) # 允许30秒延迟
          _job_class_name, args_redis_key = job.args
          _job_class_name == job_class_name
       end
    end
    if !target.nil? && !args_redis_key.nil? && !args_redis_key.empty?
      # 往现有的job参数set里增加参数
      client.lpush args_redis_key, SidekiqBulkJob::Utils.dump(perfrom_args)
    else
      # 新增加一个
      args_redis_key = SecureRandom.hex
      client.lpush args_redis_key, SidekiqBulkJob::Utils.dump(perfrom_args)
      SidekiqBulkJob::ScheduledJob.client_push("queue" => queue, "class" => SidekiqBulkJob::ScheduledJob, "at" => at, "args" => [job_class_name, args_redis_key])
    end
  end
end

.set(options) ⇒ Object



100
101
102
# File 'lib/sidekiq_bulk_job.rb', line 100

def set(options)
  SidekiqBulkJob::Setter.new(options)
end

.time_nowObject



171
172
173
# File 'lib/sidekiq_bulk_job.rb', line 171

def time_now
  Time.now
end