Class: Fasten::TimeoutQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fasten/timeout_queue.rb

Instance Method Summary collapse

Constructor Details

#initializeTimeoutQueue

Returns a new instance of TimeoutQueue.



3
4
5
6
7
# File 'lib/fasten/timeout_queue.rb', line 3

def initialize
  @mutex = Mutex.new
  @queue = []
  @received = ConditionVariable.new
end

Instance Method Details

#push(object) ⇒ Object



9
10
11
12
13
14
# File 'lib/fasten/timeout_queue.rb', line 9

def push(object)
  @mutex.synchronize do
    @queue << object
    @received.signal
  end
end

#receive_with_timeout(timeout = nil) ⇒ Object

rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/fasten/timeout_queue.rb', line 16

def receive_with_timeout(timeout = nil) # rubocop:disable Metrics/CyclomaticComplexity,Metrics/PerceivedComplexity
  @mutex.synchronize do
    if timeout.nil?
      # wait indefinitely until there is an element in the queue
      @received.wait(@mutex) while @queue.empty?
    elsif @queue.empty? && timeout != 0
      # wait for element or timeout
      timeout_time = timeout + Time.now.to_f
      while @queue.empty? && (remaining_time = timeout_time - Time.now.to_f).positive?
        @received.wait(@mutex, remaining_time)
      end
    end

    items = []
    items << @queue.shift until @queue.empty?

    items
  end
end