Class: Sidekiq::Batch
- Inherits:
-
Object
show all
- Defined in:
- lib/sidekiq/batch.rb,
lib/sidekiq/batch/status.rb,
lib/sidekiq/batch/version.rb,
lib/sidekiq/batch/callback.rb,
lib/sidekiq/batch/middleware.rb
Defined Under Namespace
Modules: Callback, Middleware
Classes: NoBlockGivenError, Status
Constant Summary
collapse
- BID_EXPIRE_TTL =
108_000
- VERSION =
'0.1.1'.freeze
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(existing_bid = nil) ⇒ Batch
Returns a new instance of Batch.
17
18
19
20
21
22
23
24
25
|
# File 'lib/sidekiq/batch.rb', line 17
def initialize(existing_bid = nil)
@bid = existing_bid || SecureRandom.urlsafe_base64(10)
Sidekiq.redis do |r|
r.multi do
r.hset("BID-#{bid}", 'created_at', Time.now)
r.expire("BID-#{bid}", BID_EXPIRE_TTL)
end
end
end
|
Instance Attribute Details
#bid ⇒ Object
Returns the value of attribute bid.
15
16
17
|
# File 'lib/sidekiq/batch.rb', line 15
def bid
@bid
end
|
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
15
16
17
|
# File 'lib/sidekiq/batch.rb', line 15
def callback_queue
@callback_queue
end
|
#description ⇒ Object
Returns the value of attribute description.
15
16
17
|
# File 'lib/sidekiq/batch.rb', line 15
def description
@description
end
|
Class Method Details
.cleanup_redis(bid) ⇒ Object
99
100
101
102
103
104
|
# File 'lib/sidekiq/batch.rb', line 99
def cleanup_redis(bid)
Sidekiq.redis do |r|
r.del("BID-#{bid}",
"BID-#{bid}-failed")
end
end
|
.increment_job_queue(bid) ⇒ Object
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/sidekiq/batch.rb', line 106
def increment_job_queue(bid)
Sidekiq.redis do |r|
r.multi do
%w(to_process pending total).each do |c|
r.hincrby("BID-#{bid}", c, 1)
end
r.expire("BID-#{bid}", BID_EXPIRE_TTL)
end
end
end
|
.process_failed_job(bid, jid) ⇒ Object
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/sidekiq/batch.rb', line 70
def process_failed_job(bid, jid)
to_process = Sidekiq.redis do |r|
r.multi do
r.sadd("BID-#{bid}-failed", jid)
r.scard("BID-#{bid}-failed")
r.hget("BID-#{bid}", 'to_process')
r.expire("BID-#{bid}-failed", BID_EXPIRE_TTL)
end
end
if to_process[2].to_i == to_process[1].to_i
Callback.call_if_needed(:complete, bid)
end
end
|
.process_successful_job(bid) ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/sidekiq/batch.rb', line 84
def process_successful_job(bid)
out = Sidekiq.redis do |r|
r.multi do
r.hincrby("BID-#{bid}", 'to_process', -1)
r.scard("BID-#{bid}-failed")
r.hincrby("BID-#{bid}", 'pending', -1)
r.expire("BID-#{bid}", BID_EXPIRE_TTL)
end
end
puts "processed process_successful_job"
Callback.call_if_needed(:complete, bid) if out[1].to_i == out[0].to_i
Callback.call_if_needed(:success, bid) if out[0].to_i.zero?
end
|
Instance Method Details
#jobs ⇒ Object
48
49
50
51
52
53
54
55
56
|
# File 'lib/sidekiq/batch.rb', line 48
def jobs
raise NoBlockGivenError unless block_given?
Sidekiq.redis { |r| r.hincrby("BID-#{bid}", 'to_process', 1) }
Thread.current[:bid] = bid
yield
Thread.current[:bid] = nil
Sidekiq.redis { |r| r.hincrby("BID-#{bid}", 'to_process', -1) }
end
|
#on(event, callback, options = {}) ⇒ Object
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/sidekiq/batch.rb', line 37
def on(event, callback, options = {})
return unless %w(success complete).include?(event.to_s)
Sidekiq.redis do |r|
r.multi do
r.hset("BID-#{bid}", "callback_#{event}", callback)
r.hset("BID-#{bid}", "callback_#{event}_opts", options.to_json)
r.expire("BID-#{bid}", BID_EXPIRE_TTL)
end
end
end
|