Class: Sqreen::Kit::Signals::BatchCollector::QueueWithTimeout

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/sqreen/kit/signals/batch_collector.rb

Overview

Constant Summary collapse

MAX_QUEUE_SIZE =
1000

Instance Method Summary collapse

Constructor Details

#initializeQueueWithTimeout

Returns a new instance of QueueWithTimeout.



135
136
137
138
139
# File 'lib/sqreen/kit/signals/batch_collector.rb', line 135

def initialize
  @mutex = Mutex.new
  @queue = []
  @received = ConditionVariable.new
end

Instance Method Details

#<<(x) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/sqreen/kit/signals/batch_collector.rb', line 141

def <<(x)
  @mutex.synchronize do
    if @queue.size >= MAX_QUEUE_SIZE
      # processing loop is prob spending too much time on http requests
      logger.warn "Queue is full! Dropping #{x}"
      next
    end
    @queue << x
    @received.signal
  end
end

#pop(deadline = nil) ⇒ Object

Parameters:

  • deadline (Float) (defaults to: nil)


162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/sqreen/kit/signals/batch_collector.rb', line 162

def pop(deadline = nil)
  @mutex.synchronize do
    if deadline.nil?
      # wait indefinitely until there is an element in the queue
      @received.wait(@mutex) while @queue.empty?
    elsif @queue.empty?
      # wait for element or timeout
      while @queue.empty? && (remaining_time = deadline - Time.now.to_f) > 0
        @received.wait(@mutex, remaining_time)
      end
    end

    return nil if @queue.empty?
    @queue.shift
  end
end

#pop_nbObject

non-blocking pop



154
155
156
157
158
159
# File 'lib/sqreen/kit/signals/batch_collector.rb', line 154

def pop_nb
  @mutex.synchronize do
    return nil if @queue.empty?
    @queue.shift
  end
end