Class: Mimi::Messaging::SQS_SNS::TimeoutQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/mimi/messaging/sqs_sns/timeout_queue.rb

Overview

TimeoutQueue solves the problem the native Ruby Queue class has with waiting for elements.

See the excellent blog post discussing the issue: medium.com/workday-engineering/ruby-concurrency-building-a-timeout-queue-5d7c588ca80d

TLDR – using Ruby standard Timeout.timeout() around Queue#pop() is unsafe

Instance Method Summary collapse

Constructor Details

#initializeTimeoutQueue

Returns a new instance of TimeoutQueue.



17
18
19
20
21
# File 'lib/mimi/messaging/sqs_sns/timeout_queue.rb', line 17

def initialize
  @elems = []
  @mutex = Mutex.new
  @cond_var = ConditionVariable.new
end

Instance Method Details

#<<(elem) ⇒ Object Also known as: push

Pushes an element into the queue

Parameters:

  • elem (Object)


27
28
29
30
31
32
# File 'lib/mimi/messaging/sqs_sns/timeout_queue.rb', line 27

def <<(elem)
  @mutex.synchronize do
    @elems << elem
    @cond_var.signal
  end
end

#pop(blocking = true, timeout = nil) ⇒ Object

Pops an element from the queue in either non-blocking or a blocking (with an optional timeout) way.

Parameters:

  • blocking (true, false) (defaults to: true)

    wait for a new element (true) or return immediately

  • timeout (nil, Integer) (defaults to: nil)

    if in blocking mode, wait at most given number of seconds or forever (nil)

Raises:

  • (Timeout::Error)

    if a timeout in blocking mode was reached



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/mimi/messaging/sqs_sns/timeout_queue.rb', line 43

def pop(blocking = true, timeout = nil)
  @mutex.synchronize do
    if blocking
      if timeout.nil?
        while @elems.empty?
          @cond_var.wait(@mutex)
        end
      else
        timeout_time = Time.now.to_f + timeout
        while @elems.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0
          @cond_var.wait(@mutex, remaining_time)
        end
      end
    end
    raise Timeout::Error, "queue timeout expired" if @elems.empty?

    @elems.shift
  end
end