Class: Backports::FilteredQueue
- Defined in:
- lib/backports/ractor/filtered_queue.rb
Overview
Like ::Queue, but with
-
filtering
-
timeout
-
raises on closed queues
Independent from other Ractor related backports.
Defined Under Namespace
Classes: ClosedQueueError, TimeoutError
Constant Summary collapse
- CONSUME_ON_ESCAPE =
true
Instance Attribute Summary collapse
-
#num_waiting ⇒ Object
readonly
Returns the value of attribute num_waiting.
Instance Method Summary collapse
- #<<(x) ⇒ Object (also: #push)
- #clear ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #empty? ⇒ Boolean
-
#initialize ⇒ FilteredQueue
constructor
Timeout processing based on spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/.
- #pop(timeout: nil, &block) ⇒ Object
Constructor Details
#initialize ⇒ FilteredQueue
Timeout processing based on spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/
34 35 36 37 38 39 40 |
# File 'lib/backports/ractor/filtered_queue.rb', line 34 def initialize @mutex = ::Mutex.new @queue = [] @closed = false @received = ::ConditionVariable.new @num_waiting = 0 end |
Instance Attribute Details
#num_waiting ⇒ Object (readonly)
Returns the value of attribute num_waiting.
31 32 33 |
# File 'lib/backports/ractor/filtered_queue.rb', line 31 def num_waiting @num_waiting end |
Instance Method Details
#<<(x) ⇒ Object Also known as: push
54 55 56 57 58 59 60 61 |
# File 'lib/backports/ractor/filtered_queue.rb', line 54 def <<(x) @mutex.synchronize do ensure_open @queue << Message.new(x) @received.signal end self end |
#clear ⇒ Object
64 65 66 67 68 69 |
# File 'lib/backports/ractor/filtered_queue.rb', line 64 def clear @mutex.synchronize do @queue.clear end self end |
#close ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/backports/ractor/filtered_queue.rb', line 42 def close @mutex.synchronize do @closed = true @received.broadcast end self end |
#closed? ⇒ Boolean
50 51 52 |
# File 'lib/backports/ractor/filtered_queue.rb', line 50 def closed? @closed end |
#empty? ⇒ Boolean
85 86 87 88 89 90 91 |
# File 'lib/backports/ractor/filtered_queue.rb', line 85 def empty? avail = @mutex.synchronize do available! end !avail end |
#pop(timeout: nil, &block) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/backports/ractor/filtered_queue.rb', line 71 def pop(timeout: nil, &block) msg = nil exclude = [] if block # exclusion list of messages rejected by this call timeout_time = timeout + Time.now.to_f if timeout while true do # rubocop:disable Style/InfiniteLoop, Style/WhileUntilDo @mutex.synchronize do reenter if reentrant? msg = acquire!(timeout_time, exclude) return consume!(msg).value unless block end return msg.value if filter?(msg, &block) end end |