Class: Sidekiq::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/batch.rb,
lib/sidekiq/batch/status.rb,
lib/sidekiq/batch/version.rb,
lib/sidekiq/batch/callback.rb,
lib/sidekiq/batch/middleware.rb

Defined Under Namespace

Modules: Callback, Extension, Middleware Classes: NoBlockGivenError, Status

Constant Summary collapse

BID_EXPIRE_TTL =
2_592_000
VERSION =
'0.2.0'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(existing_bid = nil) ⇒ Batch

Returns a new instance of Batch.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/sidekiq/batch.rb', line 17

def initialize(existing_bid = nil)
  @bid = existing_bid || SecureRandom.urlsafe_base64(10)
  @existing = !(!existing_bid || existing_bid.empty?)  # Basically existing_bid.present?
  @initialized = false
  @created_at = Time.now.utc.to_f
  @bidkey = "BID-" + @bid.to_s
  @queued_jids = []
  @pending_jids = []

  @incremental_push = !Sidekiq.default_configuration[:batch_push_interval].nil?
  @batch_push_interval = Sidekiq.default_configuration[:batch_push_interval]
end

Instance Attribute Details

#bidObject (readonly)

Returns the value of attribute bid.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def bid
  @bid
end

#callback_queueObject

Returns the value of attribute callback_queue.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def callback_queue
  @callback_queue
end

#created_atObject (readonly)

Returns the value of attribute created_at.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def created_at
  @created_at
end

#descriptionObject

Returns the value of attribute description.



15
16
17
# File 'lib/sidekiq/batch.rb', line 15

def description
  @description
end

Class Method Details

.cleanup_redis(bid) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/sidekiq/batch.rb', line 304

def cleanup_redis(bid)
  Sidekiq.logger.debug {"Cleaning redis of batch #{bid}"}
  Sidekiq.redis do |r|
    r.del(
      "BID-#{bid}",
      "BID-#{bid}-callbacks-complete",
      "BID-#{bid}-callbacks-success",
      "BID-#{bid}-failed",

      "BID-#{bid}-success",
      "BID-#{bid}-complete",
      "BID-#{bid}-jids",
    )
  end
end

.enqueue_callbacks(event, bid) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/sidekiq/batch.rb', line 244

def enqueue_callbacks(event, bid)
  event_name = event.to_s
  batch_key = "BID-#{bid}"
  callback_key = "#{batch_key}-callbacks-#{event_name}"
  already_processed, _, callbacks, queue, parent_bid, callback_batch = Sidekiq.redis do |r|
    r.multi do |pipeline|
      pipeline.hget(batch_key, event_name)
      pipeline.hset(batch_key, event_name, 'true')
      pipeline.smembers(callback_key)
      pipeline.hget(batch_key, "callback_queue")
      pipeline.hget(batch_key, "parent_bid")
      pipeline.hget(batch_key, "callback_batch")
    end
  end

  return if already_processed == 'true'

  queue ||= "default"
  parent_bid = !parent_bid || parent_bid.empty? ? nil : parent_bid    # Basically parent_bid.blank?
  callback_args = callbacks.reduce([]) do |memo, jcb|
    cb = Sidekiq.load_json(jcb)
    memo << [cb['callback'], event_name, cb['opts'], bid, parent_bid]
  end

  opts = {"bid" => bid, "event" => event_name}

  # Run callback batch finalize synchronously
  if callback_batch
    # Extract opts from cb_args or use current
    # Pass in stored event as callback finalize is processed on complete event
    cb_opts = callback_args.first&.at(2) || opts

    Sidekiq.logger.debug {"Run callback batch bid: #{bid} event: #{event_name} args: #{callback_args.inspect}"}
    # Finalize now
    finalizer = Sidekiq::Batch::Callback::Finalize.new
    status = Status.new bid
    finalizer.dispatch(status, cb_opts)

    return
  end

  Sidekiq.logger.debug {"Enqueue callback bid: #{bid} event: #{event_name} args: #{callback_args.inspect}"}

  if callback_args.empty?
    # Finalize now
    finalizer = Sidekiq::Batch::Callback::Finalize.new
    status = Status.new bid
    finalizer.dispatch(status, opts)
  else
    # Otherwise finalize in sub batch complete callback
    cb_batch = self.new
    cb_batch.callback_batch = 'true'
    Sidekiq.logger.debug {"Adding callback batch: #{cb_batch.bid} for batch: #{bid}"}
    cb_batch.on(:complete, "Sidekiq::Batch::Callback::Finalize#dispatch", opts)
    cb_batch.jobs do
      push_callbacks callback_args, queue
    end
  end
end

.process_failed_job(bid, jid) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/sidekiq/batch.rb', line 188

def process_failed_job(bid, jid)
  _, pending, failed, children, complete, parent_bid = Sidekiq.redis do |r|
    r.multi do |pipeline|
      pipeline.sadd("BID-#{bid}-failed", [jid])

      pipeline.hincrby("BID-#{bid}", "pending", 0)
      pipeline.scard("BID-#{bid}-failed")
      pipeline.hincrby("BID-#{bid}", "children", 0)
      pipeline.scard("BID-#{bid}-complete")
      pipeline.hget("BID-#{bid}", "parent_bid")

      pipeline.expire("BID-#{bid}-failed", BID_EXPIRE_TTL)
    end
  end

  # if the batch failed, and has a parent, update the parent to show one pending and failed job
  if parent_bid
    Sidekiq.redis do |r|
      r.multi do |pipeline|
        pipeline.hincrby("BID-#{parent_bid}", "pending", 1)
        pipeline.sadd("BID-#{parent_bid}-failed", [jid])
        pipeline.expire("BID-#{parent_bid}-failed", BID_EXPIRE_TTL)
      end
    end
  end

  if pending.to_i == failed.to_i && children == complete
    enqueue_callbacks(:complete, bid)
  end
end

.process_successful_job(bid, jid) ⇒ Object



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/sidekiq/batch.rb', line 219

def process_successful_job(bid, jid)
  failed, pending, children, complete, success, total, parent_bid = Sidekiq.redis do |r|
    r.multi do |pipeline|
      pipeline.scard("BID-#{bid}-failed")
      pipeline.hincrby("BID-#{bid}", "pending", -1)
      pipeline.hincrby("BID-#{bid}", "children", 0)
      pipeline.scard("BID-#{bid}-complete")
      pipeline.scard("BID-#{bid}-success")
      pipeline.hget("BID-#{bid}", "total")
      pipeline.hget("BID-#{bid}", "parent_bid")

      pipeline.srem("BID-#{bid}-failed", [jid])
      pipeline.srem("BID-#{bid}-jids", [jid])
      pipeline.expire("BID-#{bid}", BID_EXPIRE_TTL)
    end
  end

  all_success = pending.to_i.zero? && children == success
  # if complete or successfull call complete callback (the complete callback may then call successful)
  if (pending.to_i == failed.to_i && children == complete) || all_success
    enqueue_callbacks(:complete, bid)
    enqueue_callbacks(:success, bid) if all_success
  end
end

Instance Method Details

#callback_batch=(callback_batch) ⇒ Object



40
41
42
43
# File 'lib/sidekiq/batch.rb', line 40

def callback_batch=(callback_batch)
  @callback_batch = callback_batch
  persist_bid_attr('callback_batch', callback_batch)
end

#conditional_redis_increment!(force = false) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/sidekiq/batch.rb', line 123

def conditional_redis_increment!(force=false)
  if should_increment? || force
    parent_bid = Thread.current[:parent_bid]
    Sidekiq.redis do |r|
      r.multi do |pipeline|
        if parent_bid
          pipeline.hincrby("BID-#{parent_bid}", "total", @pending_jids.length)
          pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
        end

        pipeline.hincrby(@bidkey, "pending", @pending_jids.length)
        pipeline.hincrby(@bidkey, "total", @pending_jids.length)
        pipeline.expire(@bidkey, BID_EXPIRE_TTL)
      end
    end
    @pending_jids = []
  end
end

#increment_job_queue(jid) ⇒ Object



117
118
119
120
121
# File 'lib/sidekiq/batch.rb', line 117

def increment_job_queue(jid)
  @queued_jids << jid
  @pending_jids << jid
  conditional_redis_increment!
end

#invalidate_allObject



153
154
155
156
157
# File 'lib/sidekiq/batch.rb', line 153

def invalidate_all
  Sidekiq.redis do |r|
    r.setex("invalidated-bid-#{bid}", BID_EXPIRE_TTL, 1)
  end
end

#jobsObject

Raises:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/sidekiq/batch.rb', line 59

def jobs
  raise NoBlockGivenError unless block_given?

  bid_data, Thread.current[:bid_data] = Thread.current[:bid_data], []

  begin
    if !@existing && !@initialized
      parent_bid = Thread.current[:batch].bid if Thread.current[:batch]

      Sidekiq.redis do |r|
        r.multi do |pipeline|
          pipeline.hset(@bidkey, "created_at", @created_at)
          pipeline.expire(@bidkey, BID_EXPIRE_TTL)
          if parent_bid
            pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s)
            pipeline.hincrby("BID-#{parent_bid}", "children", 1)
          end
        end
      end

      @initialized = true
    end

    @queued_jids = []
    @pending_jids = []

    begin
      parent = Thread.current[:batch]
      Thread.current[:batch] = self
      Thread.current[:parent_bid] = parent_bid
      yield
    ensure
      Thread.current[:batch] = parent
      Thread.current[:parent_bid] = nil
    end

    return [] if @queued_jids.size == 0
    conditional_redis_increment!(true)

    Sidekiq.redis do |r|
      r.multi do |pipeline|
        if parent_bid
          pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
        end

        pipeline.expire(@bidkey, BID_EXPIRE_TTL)

        pipeline.sadd(@bidkey + "-jids", @queued_jids)
        pipeline.expire(@bidkey + "-jids", BID_EXPIRE_TTL)
      end
    end

    @queued_jids
  ensure
    Thread.current[:bid_data] = bid_data
  end
end

#on(event, callback, options = {}) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/sidekiq/batch.rb', line 45

def on(event, callback, options = {})
  return unless %w(success complete).include?(event.to_s)
  callback_key = "#{@bidkey}-callbacks-#{event}"
  Sidekiq.redis do |r|
    r.multi do |pipeline|
      pipeline.sadd(callback_key, [JSON.unparse({
        callback: callback,
        opts: options
      })])
      pipeline.expire(callback_key, BID_EXPIRE_TTL)
    end
  end
end

#parentObject



165
166
167
168
169
# File 'lib/sidekiq/batch.rb', line 165

def parent
  if parent_bid
    Sidekiq::Batch.new(parent_bid)
  end
end

#parent_bidObject



159
160
161
162
163
# File 'lib/sidekiq/batch.rb', line 159

def parent_bid
  Sidekiq.redis do |r|
    r.hget(@bidkey, "parent_bid")
  end
end

#should_increment?Boolean

Returns:

  • (Boolean)


142
143
144
145
146
147
148
149
150
151
# File 'lib/sidekiq/batch.rb', line 142

def should_increment?
  return false unless @incremental_push
  return true if @batch_push_interval == 0 || @queued_jids.length == 1
  now = Time.now.to_f
  @last_increment ||= now
  if @last_increment + @batch_push_interval > now
    @last_increment = now
    return true
  end
end

#valid?(batch = self) ⇒ Boolean

Returns:

  • (Boolean)


171
172
173
174
# File 'lib/sidekiq/batch.rb', line 171

def valid?(batch = self)
  valid = Sidekiq.redis { |r| r.exists("invalidated-bid-#{batch.bid}") }.zero?
  batch.parent ? valid && valid?(batch.parent) : valid
end