Class: Sqreen::Kit::Signals::BatchCollector::QueueWithTimeout
- Inherits:
-
Object
- Object
- Sqreen::Kit::Signals::BatchCollector::QueueWithTimeout
- Includes:
- Loggable
- Defined in:
- lib/sqreen/kit/signals/batch_collector.rb
Overview
Adapted from spin.atomicobject.com/2014/07/07/ruby-queue-pop-timeout/
Constant Summary collapse
- MAX_QUEUE_SIZE =
1000
Instance Method Summary collapse
- #<<(x) ⇒ Object
-
#initialize ⇒ QueueWithTimeout
constructor
A new instance of QueueWithTimeout.
- #pop(deadline = nil) ⇒ Object
-
#pop_nb ⇒ Object
non-blocking pop.
Constructor Details
#initialize ⇒ QueueWithTimeout
Returns a new instance of QueueWithTimeout.
135 136 137 138 139 |
# File 'lib/sqreen/kit/signals/batch_collector.rb', line 135 def initialize @mutex = Mutex.new @queue = [] @received = ConditionVariable.new end |
Instance Method Details
#<<(x) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/sqreen/kit/signals/batch_collector.rb', line 141 def <<(x) @mutex.synchronize do if @queue.size >= MAX_QUEUE_SIZE # processing loop is prob spending too much time on http requests logger.warn "Queue is full! Dropping #{x}" next end @queue << x @received.signal end end |
#pop(deadline = nil) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/sqreen/kit/signals/batch_collector.rb', line 162 def pop(deadline = nil) @mutex.synchronize do if deadline.nil? # wait indefinitely until there is an element in the queue @received.wait(@mutex) while @queue.empty? elsif @queue.empty? # wait for element or timeout while @queue.empty? && (remaining_time = deadline - Time.now.to_f) > 0 @received.wait(@mutex, remaining_time) end end return nil if @queue.empty? @queue.shift end end |
#pop_nb ⇒ Object
non-blocking pop
154 155 156 157 158 159 |
# File 'lib/sqreen/kit/signals/batch_collector.rb', line 154 def pop_nb @mutex.synchronize do return nil if @queue.empty? @queue.shift end end |