Class: Sidekiq::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/batch.rb,
lib/sidekiq/batch/status.rb,
lib/sidekiq/batch/version.rb,
lib/sidekiq/batch/callback.rb,
lib/sidekiq/batch/middleware.rb

Defined Under Namespace

Modules: Callback, Extension, Middleware Classes: NoBlockGivenError, Status

Constant Summary collapse

BID_EXPIRE_TTL =
108_000
VERSION =
'0.1.3'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(existing_bid = nil) ⇒ Batch

Returns a new instance of Batch.



17
18
19
20
21
22
23
24
# File 'lib/sidekiq/batch.rb', line 17

def initialize(existing_bid = nil)
  @bid = existing_bid || SecureRandom.urlsafe_base64(10)
  @existing = !(!existing_bid || existing_bid.empty?)  # Basically existing_bid.present?
  @initialized = false
  @created_at = Time.now.utc.to_f
  @bidkey = "BID-" + @bid.to_s
  @ready_to_queue = []
end

Instance Attribute Details

#bidObject (readonly)

Returns the value of attribute bid.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def bid
  @bid
end

#callback_queueObject

Returns the value of attribute callback_queue.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def callback_queue
  @callback_queue
end

#created_atObject (readonly)

Returns the value of attribute created_at.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def created_at
  @created_at
end

#descriptionObject

Returns the value of attribute description.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def description
  @description
end

Class Method Details

.cleanup_redis(bid) ⇒ Object



212
213
214
215
216
217
218
219
# File 'lib/sidekiq/batch.rb', line 212

def cleanup_redis(bid)
  Sidekiq.redis do |r|
    r.del("BID-#{bid}",
          "BID-#{bid}-callbacks-complete",
          "BID-#{bid}-callbacks-success",
          "BID-#{bid}-failed")
  end
end

.enqueue_callbacks(event, bid) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/sidekiq/batch.rb', line 183

def enqueue_callbacks(event, bid)
  batch_key = "BID-#{bid}"
  callback_key = "#{batch_key}-callbacks-#{event}"
  needed, _, callbacks, queue, parent_bid = Sidekiq.redis do |r|
    r.multi do
      r.hget(batch_key, event)
      r.hset(batch_key, event, true)
      r.smembers(callback_key)
      r.hget(batch_key, "callback_queue")
      r.hget(batch_key, "parent_bid")
    end
  end
  return if needed == 'true'

  begin
    parent_bid = !parent_bid || parent_bid.empty? ? nil : parent_bid    # Basically parent_bid.blank?
    Sidekiq::Client.push_bulk(
      'class' => Sidekiq::Batch::Callback::Worker,
      'args' => callbacks.reduce([]) do |memo, jcb|
        cb = Sidekiq.load_json(jcb)
        memo << [cb['callback'], event, cb['opts'], bid, parent_bid]
      end,
      'queue' => queue ||= 'default'
    ) unless callbacks.empty?
  ensure
    cleanup_redis(bid) if event == :success
  end
end

.process_failed_job(bid, jid) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/sidekiq/batch.rb', line 143

def process_failed_job(bid, jid)
  _, pending, failed, children, complete = Sidekiq.redis do |r|
    r.multi do
      r.sadd("BID-#{bid}-failed", jid)

      r.hincrby("BID-#{bid}", "pending", 0)
      r.scard("BID-#{bid}-failed")
      r.hincrby("BID-#{bid}", "children", 0)
      r.scard("BID-#{bid}-complete")

      r.expire("BID-#{bid}-failed", BID_EXPIRE_TTL)
    end
  end

  enqueue_callbacks(:complete, bid) if pending.to_i == failed.to_i && children == complete
end

.process_successful_job(bid, jid) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/sidekiq/batch.rb', line 160

def process_successful_job(bid, jid)
  failed, pending, children, complete, success, total, parent_bid = Sidekiq.redis do |r|
    r.multi do
      r.scard("BID-#{bid}-failed")
      r.hincrby("BID-#{bid}", "pending", -1)
      r.hincrby("BID-#{bid}", "children", 0)
      r.scard("BID-#{bid}-complete")
      r.scard("BID-#{bid}-success")
      r.hget("BID-#{bid}", "total")
      r.hget("BID-#{bid}", "parent_bid")

      r.srem("BID-#{bid}-failed", jid)
      r.srem("BID-#{bid}-jids", jid)
      r.expire("BID-#{bid}", BID_EXPIRE_TTL)
    end
  end

  puts "processed process_successful_job"

  enqueue_callbacks(:complete, bid) if pending.to_i == failed.to_i && children == complete
  enqueue_callbacks(:success, bid) if pending.to_i.zero? && children == success
end

Instance Method Details

#increment_job_queue(jid) ⇒ Object



104
105
106
# File 'lib/sidekiq/batch.rb', line 104

def increment_job_queue(jid)
  @ready_to_queue << jid
end

#invalidate_allObject



108
109
110
111
112
# File 'lib/sidekiq/batch.rb', line 108

def invalidate_all
  Sidekiq.redis do |r|
    r.setex("invalidated-bid-#{bid}", BID_EXPIRE_TTL, 1)
  end
end

#jobsObject

Raises:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/sidekiq/batch.rb', line 50

def jobs
  raise NoBlockGivenError unless block_given?

  bid_data, Thread.current[:bid_data] = Thread.current[:bid_data], []

  begin
    if !@existing && !@initialized
      parent_bid = Thread.current[:bid].bid if Thread.current[:bid]

      Sidekiq.redis do |r|
        r.multi do
          r.hset(@bidkey, "created_at", @created_at)
          r.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid
          r.expire(@bidkey, BID_EXPIRE_TTL)
        end
      end

      @initialized = true
    end

    @ready_to_queue = []

    begin
      parent = Thread.current[:bid]
      Thread.current[:bid] = self
      yield
    ensure
      Thread.current[:bid] = parent
    end

    return [] if @ready_to_queue.size == 0

    Sidekiq.redis do |r|
      r.multi do
        if parent_bid
          r.hincrby("BID-#{parent_bid}", "children", 1)
          r.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
        end

        r.hincrby(@bidkey, "pending", @ready_to_queue.size)
        r.hincrby(@bidkey, "total", @ready_to_queue.size)
        r.expire(@bidkey, BID_EXPIRE_TTL)

        r.sadd(@bidkey + "-jids", @ready_to_queue)
        r.expire(@bidkey + "-jids", BID_EXPIRE_TTL)
      end
    end

    @ready_to_queue
  ensure
    Thread.current[:bid_data] = bid_data
  end
end

#on(event, callback, options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/sidekiq/batch.rb', line 36

def on(event, callback, options = {})
  return unless %w(success complete).include?(event.to_s)
  callback_key = "#{@bidkey}-callbacks-#{event}"
  Sidekiq.redis do |r|
    r.multi do
      r.sadd(callback_key, JSON.unparse({
        callback: callback,
        opts: options
      }))
      r.expire(callback_key, BID_EXPIRE_TTL)
    end
  end
end

#parentObject



120
121
122
123
124
# File 'lib/sidekiq/batch.rb', line 120

def parent
  if parent_bid
    Sidekiq::Batch.new(parent_bid)
  end
end

#parent_bidObject



114
115
116
117
118
# File 'lib/sidekiq/batch.rb', line 114

def parent_bid
  Sidekiq.redis do |r|
    r.hget(@bidkey, "parent_bid")
  end
end

#valid?(batch = self) ⇒ Boolean

Returns:

  • (Boolean)


126
127
128
129
# File 'lib/sidekiq/batch.rb', line 126

def valid?(batch = self)
  valid = !Sidekiq.redis { |r| r.exists("invalidated-bid-#{batch.bid}") }
  batch.parent ? valid && valid?(batch.parent) : valid
end