Class: LogStash::Util::WrappedSynchronousQueue::ReadBatch

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_synchronous_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, size, wait) ⇒ ReadBatch

Returns a new instance of ReadBatch.



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 155

def initialize(queue, size, wait)
  @originals = Hash.new

  # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
  # @cancelled = Hash.new

  @generated = Hash.new
  @iterating_temp = Hash.new
  @iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
  take_originals_from_queue(queue, size, wait)
end

Instance Method Details

#cancel(event) ⇒ Object



179
180
181
182
183
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 179

def cancel(event)
  # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
  raise("cancel is unsupported")
  # @cancelled[event] = true
end

#cancelled_sizeObject



213
214
215
216
217
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 213

def cancelled_size
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
raise("cancelled_size is unsupported ")
  # @cancelled.size
end

#each(&blk) ⇒ Object



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 185

def each(&blk)
  # take care not to cause @originals or @generated to change during iteration
  @iterating = true

  # below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
  # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
  @originals.each do |e, _|
    blk.call(e) unless e.cancelled?
  end
  @generated.each do |e, _|
    blk.call(e) unless e.cancelled?
  end
  @iterating = false
  update_generated
end

#filtered_sizeObject



209
210
211
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 209

def filtered_size
  @originals.size + @generated.size
end

#merge(event) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 167

def merge(event)
  return if event.nil? || @originals.key?(event)
  # take care not to cause @generated to change during iteration
  # @iterating_temp is merged after the iteration
  if iterating?
    @iterating_temp[event] = true
  else
    # the periodic flush could generate events outside of an each iteration
    @generated[event] = true
  end
end

#sizeObject



201
202
203
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 201

def size
  filtered_size
end

#starting_sizeObject



205
206
207
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 205

def starting_size
  @originals.size
end