Class: Sidekiq::Queue

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/sidekiq/api.rb

Overview

Represents a queue within Sidekiq. Allows enumeration of all jobs within the queue and deletion of jobs. NB: this queue data is real-time and is changing within Redis moment by moment.

queue = Sidekiq::Queue.new("mailer")
queue.each do |job|
  job.klass # => 'MyWorker'
  job.args # => [1, 2, 3]
  job.delete if job.jid == 'abcdef1234567890'
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name = "default") ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • name (String) (defaults to: "default")

    the name of the queue



241
242
243
244
# File 'lib/sidekiq/api.rb', line 241

def initialize(name = "default")
  @name = name.to_s
  @rname = "queue:#{name}"
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



238
239
240
# File 'lib/sidekiq/api.rb', line 238

def name
  @name
end

Class Method Details

.allArray<Sidekiq::Queue>

Fetch all known queues within Redis.

Returns:



234
235
236
# File 'lib/sidekiq/api.rb', line 234

def self.all
  Sidekiq.redis { |c| c.sscan("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
end

Instance Method Details

#as_json(options = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

:nodoc:



324
325
326
# File 'lib/sidekiq/api.rb', line 324

def as_json(options = nil)
  {name: name} # 5336
end

#clearBoolean Also known as: 💣

delete all jobs within this queue

Returns:

  • (Boolean)

    true



311
312
313
314
315
316
317
318
319
# File 'lib/sidekiq/api.rb', line 311

def clear
  Sidekiq.redis do |conn|
    conn.multi do |transaction|
      transaction.unlink(@rname)
      transaction.srem("queues", [name])
    end
  end
  true
end

#eachObject



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/sidekiq/api.rb', line 275

def each
  initial_size = size
  deleted_size = 0
  page = 0
  page_size = 50

  loop do
    range_start = page * page_size - deleted_size
    range_end = range_start + page_size - 1
    entries = Sidekiq.redis { |conn|
      conn.lrange @rname, range_start, range_end
    }
    break if entries.empty?
    page += 1
    entries.each do |entry|
      yield JobRecord.new(entry, @name)
    end
    deleted_size = initial_size - size
  end
end

#find_job(jid) ⇒ Sidekiq::JobRecord?

Find the job with the given JID within this queue.

This is a *slow, inefficient* operation. Do not use under normal conditions.

Parameters:

  • jid (String)

    the job_id to look for

Returns:



305
306
307
# File 'lib/sidekiq/api.rb', line 305

def find_job(jid)
  detect { |j| j.jid == jid }
end

#latencyFloat

Calculates this queue’s latency, the difference in seconds since the oldest job in the queue was enqueued.

Returns:

  • (Float)

    in seconds



264
265
266
267
268
269
270
271
272
273
# File 'lib/sidekiq/api.rb', line 264

def latency
  entry = Sidekiq.redis { |conn|
    conn.lindex(@rname, -1)
  }
  return 0 unless entry
  job = Sidekiq.load_json(entry)
  now = Time.now.to_f
  thence = job["enqueued_at"] || now
  now - thence
end

#paused?Boolean

Returns if the queue is currently paused.

Returns:

  • (Boolean)

    if the queue is currently paused



255
256
257
# File 'lib/sidekiq/api.rb', line 255

def paused?
  false
end

#sizeInteger

The current size of the queue within Redis. This value is real-time and can change between calls.

Returns:

  • (Integer)

    the size



250
251
252
# File 'lib/sidekiq/api.rb', line 250

def size
  Sidekiq.redis { |con| con.llen(@rname) }
end