Class: Proco::Queue::Base
- Inherits:
-
Object
- Object
- Proco::Queue::Base
- Includes:
- MT::Base
- Defined in:
- lib/proco/queue/base.rb
Direct Known Subclasses
Defined Under Namespace
Classes: Invalidated
Instance Method Summary collapse
-
#initialize(size, delay) ⇒ Base
constructor
A new instance of Base.
- #invalidate ⇒ Object
- #push(item) ⇒ Object
- #take ⇒ Object
Methods included from MT::Base
#broadcast, #do_when, #signal, #synchronize, #try_when, #wait_until
Constructor Details
#initialize(size, delay) ⇒ Base
Returns a new instance of Base.
15 16 17 18 19 20 21 |
# File 'lib/proco/queue/base.rb', line 15 def initialize size, delay super() @size = size @delay = delay || 0 @items = [] @valid = true end |
Instance Method Details
#invalidate ⇒ Object
23 24 25 26 27 |
# File 'lib/proco/queue/base.rb', line 23 def invalidate broadcast do @valid = false end end |
#push(item) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/proco/queue/base.rb', line 29 def push item @mtx.lock while true raise Invalidated unless @valid break if @items.length < @size @cv.wait @mtx end push_impl item ensure @cv.broadcast @mtx.unlock end |
#take ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/proco/queue/base.rb', line 42 def take @mtx.lock wait_at = nil while true empty = @items.empty? unless empty if wait_at && @delay > 0 n = Time.now t = wait_at + @delay t += @delay * ((n - t) / @delay).to_i if t < n t += @delay if t < n # Haven't took anything. # No need to broadcast to blocked pushers @mtx.unlock sleep t - n @mtx.lock end break end return nil unless @valid wait_at = Time.now @cv.wait @mtx end take_impl ensure @cv.broadcast @mtx.unlock end |