Module: SidekiqBulkJob
- Defined in:
- lib/sidekiq_bulk_job.rb,
lib/sidekiq_bulk_job/utils.rb,
lib/sidekiq_bulk_job/monitor.rb,
lib/sidekiq_bulk_job/version.rb,
lib/sidekiq_bulk_job/bulk_job.rb,
lib/sidekiq_bulk_job/job_retry.rb,
lib/sidekiq_bulk_job/batch_runner.rb,
lib/sidekiq_bulk_job/scheduled_job.rb,
lib/sidekiq_bulk_job/bulk_error_handler.rb
Defined Under Namespace
Modules: BatchRunner, Utils
Classes: BulkErrorHandler, BulkJob, Error, JobRetry, Monitor, ScheduledJob, Setter
Constant Summary
collapse
- VERSION =
"0.1.9"
Class Attribute Summary collapse
Class Method Summary
collapse
-
.bulk_run(job_class_name, key, queue: self.queue, async: true) ⇒ Object
-
.clear(key) ⇒ Object
-
.client ⇒ Object
-
.config(redis:, logger:, process_fail:, async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob") ⇒ Object
-
.fail_callback(job_class_name:, args:, exception:) ⇒ Object
-
.flush(key) ⇒ Object
-
.generate_key(job_class_name) ⇒ Object
-
.monitor(job_class_name, queue: self.queue) ⇒ Object
-
.need_flush?(key) ⇒ Boolean
-
.perform_async(job_class, *perfrom_args) ⇒ Object
无法定义具体执行时间,相当于perform_async的批量执行 example: SidekiqBulkJob.perform_async(SomeWorkerClass, *args).
-
.perform_in(at, job_class, *perfrom_args) ⇒ Object
(also: perform_at)
延迟一段时间执行 example: SidekiqBulkJob.perform_at(Date.parse(“2020-12-01”), SomeWorkerClass, *args).
-
.process(job_class_name:, at: nil, perfrom_args: [], queue: self.queue) ⇒ Object
-
.set(options) ⇒ Object
-
.time_now ⇒ Object
Class Attribute Details
.async_delay ⇒ Object
Returns the value of attribute async_delay.
67
68
69
|
# File 'lib/sidekiq_bulk_job.rb', line 67
def async_delay
@async_delay
end
|
.batch_size ⇒ Object
Returns the value of attribute batch_size.
67
68
69
|
# File 'lib/sidekiq_bulk_job.rb', line 67
def batch_size
@batch_size
end
|
.logger ⇒ Object
Returns the value of attribute logger.
67
68
69
|
# File 'lib/sidekiq_bulk_job.rb', line 67
def logger
@logger
end
|
.prefix ⇒ Object
Returns the value of attribute prefix.
67
68
69
|
# File 'lib/sidekiq_bulk_job.rb', line 67
def prefix
@prefix
end
|
.process_fail ⇒ Object
Returns the value of attribute process_fail.
67
68
69
|
# File 'lib/sidekiq_bulk_job.rb', line 67
def process_fail
@process_fail
end
|
.queue ⇒ Object
Returns the value of attribute queue.
67
68
69
|
# File 'lib/sidekiq_bulk_job.rb', line 67
def queue
@queue
end
|
.redis ⇒ Object
Returns the value of attribute redis.
67
68
69
|
# File 'lib/sidekiq_bulk_job.rb', line 67
def redis
@redis
end
|
.scheduled_delay ⇒ Object
Returns the value of attribute scheduled_delay.
67
68
69
|
# File 'lib/sidekiq_bulk_job.rb', line 67
def scheduled_delay
@scheduled_delay
end
|
Class Method Details
.bulk_run(job_class_name, key, queue: self.queue, async: true) ⇒ Object
202
203
204
205
206
|
# File 'lib/sidekiq_bulk_job.rb', line 202
def bulk_run(job_class_name, key, queue: self.queue, async: true)
args_array = flush(key)
return if args_array.nil? || args_array.empty?
async ? SidekiqBulkJob::BulkJob.client_push("queue" => queue, "class" => SidekiqBulkJob::BulkJob, "args" => [job_class_name, args_array]) : SidekiqBulkJob::BulkJob.new.perform(job_class_name, args_array)
end
|
.clear(key) ⇒ Object
194
195
196
197
198
199
200
|
# File 'lib/sidekiq_bulk_job.rb', line 194
def clear(key)
script = %Q{
local size = redis.call('llen', KEYS[1])
if size == 0 then redis.call('del', KEYS[1]) end
}
client.eval script, [key]
end
|
.client ⇒ Object
164
165
166
167
168
169
|
# File 'lib/sidekiq_bulk_job.rb', line 164
def client
if redis.nil?
raise ArgumentError.new("Please initialize redis first!")
end
redis
end
|
.config(redis:, logger:, process_fail:, async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob") ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/sidekiq_bulk_job.rb', line 69
def config(redis: , logger: , process_fail: , async_delay: 60, scheduled_delay: 10, queue: :default, batch_size: 3000, prefix: "SidekiqBulkJob")
if redis.nil?
raise ArgumentError.new("redis not allow nil")
end
if logger.nil?
raise ArgumentError.new("logger not allow nil")
end
if process_fail.nil?
raise ArgumentError.new("process_fail not allow nil")
end
if async_delay.to_f < 2
raise ArgumentError.new("async_delay not allow less than 2 seconds.")
elsif async_delay.to_f > 5 * 60
raise ArgumentError.new("async_delay not allow greater than 5 minutes.")
end
if scheduled_delay.to_f < 2
raise ArgumentError.new("scheduled_delay not allow less than 2 seconds.")
elsif scheduled_delay.to_f > 30
raise ArgumentError.new("scheduled_delay not allow greater than 2 seconds.")
end
self.redis = redis
self.queue = queue
self.batch_size = batch_size
self.prefix = prefix
self.logger = logger
self.process_fail = process_fail
self.async_delay = async_delay.to_f
self.scheduled_delay = scheduled_delay.to_f
end
|
.fail_callback(job_class_name:, args:, exception:) ⇒ Object
223
224
225
|
# File 'lib/sidekiq_bulk_job.rb', line 223
def fail_callback(job_class_name: , args:, exception: )
process_fail.call(job_class_name, args, exception)
end
|
.flush(key) ⇒ Object
180
181
182
183
184
185
186
187
188
189
190
191
192
|
# File 'lib/sidekiq_bulk_job.rb', line 180
def flush(key)
result = []
begin
_result, success = client.multi do |multi|
multi.lrange(key, 0, batch_size)
multi.ltrim(key, batch_size+1, -1)
end
result += _result
count = client.llen key
end while count > 0
clear(key)
result.reverse
end
|
.generate_key(job_class_name) ⇒ Object
160
161
162
|
# File 'lib/sidekiq_bulk_job.rb', line 160
def generate_key(job_class_name)
"#{prefix}:#{job_class_name}"
end
|
.monitor(job_class_name, queue: self.queue) ⇒ Object
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
# File 'lib/sidekiq_bulk_job.rb', line 208
def monitor(job_class_name, queue: self.queue)
scheduled_set = Sidekiq::ScheduledSet.new
_monitor = scheduled_set.find do |job|
if job.klass == SidekiqBulkJob::Monitor.to_s
timestamp, _job_class_name = job.args
_job_class_name == job_class_name
end
end
if !_monitor.nil?
else
SidekiqBulkJob::Monitor.client_push("queue" => queue, "at" => (time_now + self.async_delay).to_f, "class" => SidekiqBulkJob::Monitor, "args" => [time_now.to_f, job_class_name])
end
end
|
.need_flush?(key) ⇒ Boolean
175
176
177
178
|
# File 'lib/sidekiq_bulk_job.rb', line 175
def need_flush?(key)
count = client.llen key
return true if count >= batch_size
end
|
无法定义具体执行时间,相当于perform_async的批量执行 example:
SidekiqBulkJob.perform_async(SomeWorkerClass, *args)
108
109
110
111
|
# File 'lib/sidekiq_bulk_job.rb', line 108
def perform_async(job_class, *perfrom_args)
process(job_class_name: job_class.to_s, perfrom_args: perfrom_args)
nil
end
|
延迟一段时间执行 example:
SidekiqBulkJob.perform_at(Date.parse("2020-12-01"), SomeWorkerClass, *args)
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/sidekiq_bulk_job.rb', line 116
def perform_in(at, job_class, *perfrom_args)
int = at.to_f
now = time_now
ts = (int < 1_000_000_000 ? now + int : int).to_f
if ts <= now.to_f
process(job_class_name: job_class.to_s, perfrom_args: perfrom_args)
else
process(at: ts, job_class_name: job_class.to_s, perfrom_args: perfrom_args)
end
end
|
.process(job_class_name:, at: nil, perfrom_args: [], queue: self.queue) ⇒ Object
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
# File 'lib/sidekiq_bulk_job.rb', line 132
def process(job_class_name: , at: nil, perfrom_args: [], queue: self.queue)
if at.nil?
key = generate_key(job_class_name)
client.lpush key, SidekiqBulkJob::Utils.dump(perfrom_args)
bulk_run(job_class_name, key, queue: queue) if need_flush?(key)
monitor(job_class_name, queue: queue)
else
scheduled_set = Sidekiq::ScheduledSet.new
args_redis_key = nil
target = scheduled_set.find do |job|
if job.klass == SidekiqBulkJob::ScheduledJob.to_s &&
job.at.to_i.between?((at - self.scheduled_delay).to_i, (at + self.scheduled_delay).to_i) _job_class_name, args_redis_key = job.args
_job_class_name == job_class_name
end
end
if !target.nil? && !args_redis_key.nil? && !args_redis_key.empty?
client.lpush args_redis_key, SidekiqBulkJob::Utils.dump(perfrom_args)
else
args_redis_key = SecureRandom.hex
client.lpush args_redis_key, SidekiqBulkJob::Utils.dump(perfrom_args)
SidekiqBulkJob::ScheduledJob.client_push("queue" => queue, "class" => SidekiqBulkJob::ScheduledJob, "at" => at, "args" => [job_class_name, args_redis_key])
end
end
end
|
.time_now ⇒ Object
171
172
173
|
# File 'lib/sidekiq_bulk_job.rb', line 171
def time_now
Time.now
end
|