Class: Mimi::Messaging::SQS_SNS::TimeoutQueue
- Inherits:
-
Object
- Object
- Mimi::Messaging::SQS_SNS::TimeoutQueue
- Defined in:
- lib/mimi/messaging/sqs_sns/timeout_queue.rb
Overview
TimeoutQueue solves the problem the native Ruby Queue class has with waiting for elements.
See the excellent blog post discussing the issue: medium.com/workday-engineering/ruby-concurrency-building-a-timeout-queue-5d7c588ca80d
TLDR – using Ruby standard Timeout.timeout() around Queue#pop() is unsafe
Instance Method Summary collapse
-
#<<(elem) ⇒ Object
(also: #push)
Pushes an element into the queue.
-
#initialize ⇒ TimeoutQueue
constructor
A new instance of TimeoutQueue.
-
#pop(blocking = true, timeout = nil) ⇒ Object
Pops an element from the queue in either non-blocking or a blocking (with an optional timeout) way.
Constructor Details
#initialize ⇒ TimeoutQueue
Returns a new instance of TimeoutQueue.
17 18 19 20 21 |
# File 'lib/mimi/messaging/sqs_sns/timeout_queue.rb', line 17 def initialize @elems = [] @mutex = Mutex.new @cond_var = ConditionVariable.new end |
Instance Method Details
#<<(elem) ⇒ Object Also known as: push
Pushes an element into the queue
27 28 29 30 31 32 |
# File 'lib/mimi/messaging/sqs_sns/timeout_queue.rb', line 27 def <<(elem) @mutex.synchronize do @elems << elem @cond_var.signal end end |
#pop(blocking = true, timeout = nil) ⇒ Object
Pops an element from the queue in either non-blocking or a blocking (with an optional timeout) way.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/mimi/messaging/sqs_sns/timeout_queue.rb', line 43 def pop(blocking = true, timeout = nil) @mutex.synchronize do if blocking if timeout.nil? while @elems.empty? @cond_var.wait(@mutex) end else timeout_time = Time.now.to_f + timeout while @elems.empty? && (remaining_time = timeout_time - Time.now.to_f) > 0 @cond_var.wait(@mutex, remaining_time) end end end raise Timeout::Error, "queue timeout expired" if @elems.empty? @elems.shift end end |