Class: SMQ::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/smq/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, batch_size = 5) ⇒ Queue

Returns a new instance of Queue.



8
9
10
11
# File 'lib/smq/queue.rb', line 8

def initialize(name, batch_size = 5)
  @name = name
  @batch_size = batch_size
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



6
7
8
# File 'lib/smq/queue.rb', line 6

def batch_size
  @batch_size
end

#nameObject

Returns the value of attribute name.



5
6
7
# File 'lib/smq/queue.rb', line 5

def name
  @name
end

Instance Method Details

#clear_completed!(limit = nil) ⇒ Object



55
56
57
# File 'lib/smq/queue.rb', line 55

def clear_completed!(limit = nil)
  delete_queue_items "completed_at IS NOT NULL", limit
end

#clear_failed!(limit = nil) ⇒ Object



61
62
63
# File 'lib/smq/queue.rb', line 61

def clear_failed!(limit = nil)
  delete_queue_items "completed_at IS NOT NULL AND failed_at IS NOT NULL", limit
end

#clear_successful!(limit = nil) ⇒ Object



58
59
60
# File 'lib/smq/queue.rb', line 58

def clear_successful!(limit = nil)
  delete_queue_items "completed_at IS NOT NULL AND failed_at IS NULL", limit
end

#enqueue(payload) ⇒ Object



23
24
25
# File 'lib/smq/queue.rb', line 23

def enqueue(payload)
  push SMQ::Message.build(payload)
end

#find_available(batches = 1, batch = 1) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/smq/queue.rb', line 35

def find_available(batches=1, batch=1)
  SMQ::Message.find(
    :all,
    :select => 'id, updated_at',
    :conditions => [
      "queue = ? AND (id % ?) = ? AND completed_at IS NULL AND locked_by IS NULL",
      @name, batches, batch-1
    ],
    :order => "id ASC", :limit => @batch_size
  ).sort_by { rand() }
end

#flush!Object



51
52
53
# File 'lib/smq/queue.rb', line 51

def flush!
  delete_queue_items
end

#lengthObject



47
48
49
# File 'lib/smq/queue.rb', line 47

def length
  SMQ::Message.count(:conditions => ["queue = ? AND completed_at IS NULL", @name])
end

#push(msg) ⇒ Object



17
18
19
20
21
# File 'lib/smq/queue.rb', line 17

def push(msg)
  msg.queue = @name
  msg.save!
  msg
end

#reserve(worker) ⇒ Object



27
28
29
30
31
32
33
# File 'lib/smq/queue.rb', line 27

def reserve(worker)
  find_available(worker.batches, worker.batch).each do |msg|
    m = msg.lock!(worker)
    return m unless m == nil
  end
  nil
end

#to_sObject



13
14
15
# File 'lib/smq/queue.rb', line 13

def to_s
  @name
end