Class: Bunny::Concurrent::ContinuationQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/concurrent/continuation_queue.rb

Overview

Continuation queue implementation for MRI and Rubinius

Instance Method Summary collapse

Constructor Details

#initializeContinuationQueue

Returns a new instance of ContinuationQueue.



9
10
11
12
13
# File 'lib/bunny/concurrent/continuation_queue.rb', line 9

def initialize
  @q    = []
  @lock = ::Mutex.new
  @cond = ::ConditionVariable.new
end

Instance Method Details

#clearObject



46
47
48
49
50
# File 'lib/bunny/concurrent/continuation_queue.rb', line 46

def clear
  @lock.synchronize do
    @q.clear
  end
end

#empty?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/bunny/concurrent/continuation_queue.rb', line 52

def empty?
  @q.empty?
end

#poll(timeout_in_ms = nil) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/bunny/concurrent/continuation_queue.rb', line 27

def poll(timeout_in_ms = nil)
  timeout = timeout_in_ms ? timeout_in_ms / 1000.0 : nil

  @lock.synchronize do
    timeout_strikes_at = Time.now.utc + (timeout || 0)
    while @q.empty?
      wait = if timeout
               timeout_strikes_at - Time.now.utc
             else
               nil
             end
      @cond.wait(@lock, wait)
      raise ::Timeout::Error if wait && Time.now.utc >= timeout_strikes_at
    end
    item = @q.shift
    item
  end
end

#popObject



23
24
25
# File 'lib/bunny/concurrent/continuation_queue.rb', line 23

def pop
  poll
end

#push(item) ⇒ Object Also known as: <<



15
16
17
18
19
20
# File 'lib/bunny/concurrent/continuation_queue.rb', line 15

def push(item)
  @lock.synchronize do
    @q.push(item)
    @cond.signal
  end
end

#sizeObject Also known as: length



56
57
58
# File 'lib/bunny/concurrent/continuation_queue.rb', line 56

def size
  @q.size
end