Class: Fasten::TimeoutQueue
- Inherits:
-
Object
- Object
- Fasten::TimeoutQueue
- Defined in:
- lib/fasten/timeout_queue.rb
Instance Method Summary collapse
-
#initialize ⇒ TimeoutQueue
constructor
A new instance of TimeoutQueue.
- #push(object) ⇒ Object
-
#receive_with_timeout(timeout = nil) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity.
Constructor Details
#initialize ⇒ TimeoutQueue
Returns a new instance of TimeoutQueue.
3 4 5 6 7 |
# File 'lib/fasten/timeout_queue.rb', line 3 def initialize @mutex = Mutex.new @queue = [] @received = ConditionVariable.new end |
Instance Method Details
#push(object) ⇒ Object
9 10 11 12 13 14 |
# File 'lib/fasten/timeout_queue.rb', line 9 def push(object) @mutex.synchronize do @queue << object @received.signal end end |
#receive_with_timeout(timeout = nil) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/fasten/timeout_queue.rb', line 16 def receive_with_timeout(timeout = nil) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity @mutex.synchronize do if timeout.nil? # wait indefinitely until there is an element in the queue @received.wait(@mutex) while @queue.empty? elsif @queue.empty? && timeout != 0 # wait for element or timeout timeout_time = timeout + Time.now.to_f while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f).positive? @received.wait(@mutex, remaining_time) end end items = [] items << @queue.shift until @queue.empty? items end end |