Class: PlainApm::Queue
- Inherits:
-
Object
- Object
- PlainApm::Queue
- Defined in:
- lib/plain_apm/queue.rb
Overview
The queue is made to conform to the ruby/spec
Constant Summary collapse
- ClosedQueueError =
Class.new(StopIteration)
Instance Method Summary collapse
- #clear ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #empty? ⇒ Boolean
-
#initialize(initial = nil) ⇒ Queue
constructor
A new instance of Queue.
- #length ⇒ Object (also: #size)
- #num_waiting ⇒ Object
- #pop(non_block = false, timeout: nil) ⇒ Object (also: #deq, #shift)
- #push(obj) ⇒ Object (also: #<<, #enq)
Constructor Details
#initialize(initial = nil) ⇒ Queue
Returns a new instance of Queue.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/plain_apm/queue.rb', line 9 def initialize(initial = nil) @data = [] @num_waiting = 0 @closed = false @mutex = Mutex.new @cv = ConditionVariable.new return if initial.nil? raise TypeError, "can't convert #{initial.class} into Array" unless initial.respond_to?(:to_a) elems = initial.to_a raise TypeError, "can't convert #{initial.class} into Array (#{initial.class}#to_a gives #{elems.class})" unless elems.is_a?(Array) @data.concat(elems) end |
Instance Method Details
#clear ⇒ Object
39 40 41 |
# File 'lib/plain_apm/queue.rb', line 39 def clear @data = [] end |
#close ⇒ Object
26 27 28 29 30 31 32 33 |
# File 'lib/plain_apm/queue.rb', line 26 def close @mutex.synchronize do return if @closed @closed = true # Wake up everyone waiting on this. @cv.broadcast end end |
#closed? ⇒ Boolean
35 36 37 |
# File 'lib/plain_apm/queue.rb', line 35 def closed? @closed end |
#empty? ⇒ Boolean
47 48 49 |
# File 'lib/plain_apm/queue.rb', line 47 def empty? @data.empty? end |
#length ⇒ Object Also known as: size
51 52 53 |
# File 'lib/plain_apm/queue.rb', line 51 def length @data.length end |
#num_waiting ⇒ Object
43 44 45 |
# File 'lib/plain_apm/queue.rb', line 43 def num_waiting @num_waiting end |
#pop(non_block = false, timeout: nil) ⇒ Object Also known as: deq, shift
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/plain_apm/queue.rb', line 68 def pop(non_block = false, timeout: nil) if non_block && timeout raise ArgumentError, "can't set a timeout if non_block is enabled" end if !timeout.nil? && !timeout.is_a?(Numeric) raise TypeError, "no implicit conversion to float from #{timeout.class.name.downcase}" end @mutex.synchronize do # The data is there. return @data.shift if !@data.empty? # Non block raises on empty queue raise ThreadError, "queue empty" if non_block # 0 means immediate timeout. Closed empty queue also immediately returns a nil return nil if timeout == 0 || @closed # Blocking and open. Let's wait. timeout_at = timeout.nil? ? nil : now + timeout.to_f begin # We could keep the threads in an array, but a counter should do as well. @num_waiting += 1 while @data.empty? && !@closed if timeout_at.nil? # Wait indefinitely. @cv.wait(@mutex) else # Wait for what's left of the deadline break if (left = timeout_at - now) <= 0.0 @cv.wait(@mutex, left) end end ensure @num_waiting -= 1 end # Return whatever is there now, or nil (if timed out) @data.shift end end |
#push(obj) ⇒ Object Also known as: <<, enq
57 58 59 60 61 62 63 |
# File 'lib/plain_apm/queue.rb', line 57 def push(obj) @mutex.synchronize do raise ClosedQueueError if closed? @data << obj @cv.signal end end |