Class: PlainApm::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/plain_apm/queue.rb

Overview

The queue is made to conform to the ruby/spec

Constant Summary collapse

ClosedQueueError =
Class.new(StopIteration)

Instance Method Summary collapse

Constructor Details

#initialize(initial = nil) ⇒ Queue

Returns a new instance of Queue.

Raises:

  • (TypeError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/plain_apm/queue.rb', line 9

def initialize(initial = nil)
  @data = []
  @num_waiting = 0
  @closed = false
  @mutex = Mutex.new
  @cv = ConditionVariable.new

  return if initial.nil?

  raise TypeError, "can't convert #{initial.class} into Array" unless initial.respond_to?(:to_a)

  elems = initial.to_a
  raise TypeError, "can't convert #{initial.class} into Array (#{initial.class}#to_a gives #{elems.class})" unless elems.is_a?(Array)

  @data.concat(elems)
end

Instance Method Details

#clearObject



39
40
41
# File 'lib/plain_apm/queue.rb', line 39

def clear
  @data = []
end

#closeObject



26
27
28
29
30
31
32
33
# File 'lib/plain_apm/queue.rb', line 26

def close
  @mutex.synchronize do
    return if @closed
    @closed = true
    # Wake up everyone waiting on this.
    @cv.broadcast
  end
end

#closed?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/plain_apm/queue.rb', line 35

def closed?
  @closed
end

#empty?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/plain_apm/queue.rb', line 47

def empty?
  @data.empty?
end

#lengthObject Also known as: size



51
52
53
# File 'lib/plain_apm/queue.rb', line 51

def length
  @data.length
end

#num_waitingObject



43
44
45
# File 'lib/plain_apm/queue.rb', line 43

def num_waiting
  @num_waiting
end

#pop(non_block = false, timeout: nil) ⇒ Object Also known as: deq, shift



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/plain_apm/queue.rb', line 68

def pop(non_block = false, timeout: nil)
  if non_block && timeout
    raise ArgumentError, "can't set a timeout if non_block is enabled"
  end

  if !timeout.nil? && !timeout.is_a?(Numeric)
    raise TypeError, "no implicit conversion to float from #{timeout.class.name.downcase}"
  end

  @mutex.synchronize do
    # The data is there.
    return @data.shift if !@data.empty?

    # Non block raises on empty queue
    raise ThreadError, "queue empty" if non_block

    # 0 means immediate timeout. Closed empty queue also immediately returns a nil
    return nil if timeout == 0 || @closed

    # Blocking and open. Let's wait.
    timeout_at = timeout.nil? ? nil : now + timeout.to_f

    begin
      # We could keep the threads in an array, but a counter should do as well.
      @num_waiting += 1

      while @data.empty? && !@closed
        if timeout_at.nil?
          # Wait indefinitely.
          @cv.wait(@mutex)
        else
          # Wait for what's left of the deadline
          break if (left = timeout_at - now) <= 0.0
          @cv.wait(@mutex, left)
        end
      end
    ensure
      @num_waiting -= 1
    end

    # Return whatever is there now, or nil (if timed out)
    @data.shift
  end
end

#push(obj) ⇒ Object Also known as: <<, enq



57
58
59
60
61
62
63
# File 'lib/plain_apm/queue.rb', line 57

def push(obj)
  @mutex.synchronize do
    raise ClosedQueueError if closed?
    @data << obj
    @cv.signal
  end
end