Class: Broker::Queue

Inherits:
Object
  • Object
show all
Includes:
Utility
Defined in:
lib/broker/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utility

#safe_thread, #timestamp, #watcher

Constructor Details

#initializeQueue

Returns a new instance of Queue.



10
11
12
13
14
# File 'lib/broker/queue.rb', line 10

def initialize
  @processed = 0
  @pending = []
  @failed = []
end

Instance Attribute Details

#failedObject (readonly)

Returns the value of attribute failed.



8
9
10
# File 'lib/broker/queue.rb', line 8

def failed
  @failed
end

#pendingObject (readonly)

Returns the value of attribute pending.



8
9
10
# File 'lib/broker/queue.rb', line 8

def pending
  @pending
end

#processedObject (readonly)

Returns the value of attribute processed.



8
9
10
# File 'lib/broker/queue.rb', line 8

def processed
  @processed
end

Instance Method Details

#empty?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'lib/broker/queue.rb', line 24

def empty?
  @pending.empty?
end

#failure(payload) ⇒ Object



28
29
30
31
# File 'lib/broker/queue.rb', line 28

def failure(payload)
  @failed << payload
  move(payload, Broker.options[:failed_path])
end

#nextObject



20
21
22
# File 'lib/broker/queue.rb', line 20

def next
  @pending.pop
end

#push(paths) ⇒ Object



16
17
18
# File 'lib/broker/queue.rb', line 16

def push(paths)
  paths.each { |p| @pending << Broker::Payload.parse(p) }
end

#success(payload) ⇒ Object



33
34
35
36
# File 'lib/broker/queue.rb', line 33

def success(payload)
  @processed +=1
  move(payload, Broker.options[:processed_path])
end