Class: Bunny::Concurrent::ContinuationQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/concurrent/continuation_queue.rb

Overview

Continuation queue implementation for MRI and Rubinius

Instance Method Summary collapse

Constructor Details

#initializeContinuationQueue

Returns a new instance of ContinuationQueue.



9
10
11
12
13
# File 'lib/bunny/concurrent/continuation_queue.rb', line 9

def initialize
  @q    = []
  @lock = ::Mutex.new
  @cond = ::ConditionVariable.new
end

Instance Method Details

#clearObject



47
48
49
50
51
# File 'lib/bunny/concurrent/continuation_queue.rb', line 47

def clear
  @lock.synchronize do
    @q.clear
  end
end

#empty?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/bunny/concurrent/continuation_queue.rb', line 53

def empty?
  @q.empty?
end

#poll(timeout_in_ms = nil) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/bunny/concurrent/continuation_queue.rb', line 27

def poll(timeout_in_ms = nil)
  timeout_in_sec = timeout_in_ms ? timeout_in_ms / 1000.0 : nil

  @lock.synchronize do
    started_at = Bunny::Timestamp.monotonic
    while @q.empty?
      wait = !(timeout_in_sec.nil?)
      @cond.wait(@lock, timeout_in_sec)

      if wait
        ended_at = Bunny::Timestamp.monotonic
        elapsed = ended_at - started_at
        raise ::Timeout::Error if (elapsed > timeout_in_sec)
      end
    end
    item = @q.shift
    item
  end
end

#popObject



23
24
25
# File 'lib/bunny/concurrent/continuation_queue.rb', line 23

def pop
  poll
end

#push(item) ⇒ Object Also known as: <<



15
16
17
18
19
20
# File 'lib/bunny/concurrent/continuation_queue.rb', line 15

def push(item)
  @lock.synchronize do
    @q.push(item)
    @cond.signal
  end
end

#sizeObject Also known as: length



57
58
59
# File 'lib/bunny/concurrent/continuation_queue.rb', line 57

def size
  @q.size
end