Class: LogStash::Util::WrappedSynchronousQueue::ReadClient

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_synchronous_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, batch_size = 125, wait_for = 50) ⇒ ReadClient

We generally only want one thread at a time able to access pop/take/poll operations from this queue. We also depend on this to be able to block consumers while we snapshot in-flight buffers



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 32

def initialize(queue, batch_size = 125, wait_for = 50)
  @queue = queue
  # Note that @inflight_batches as a central mechanism for tracking inflight
  # batches will fail if we have multiple read clients in the pipeline.
  @inflight_batches = Concurrent::Map.new

  # allow the worker thread to report the execution time of the filter + output
  @inflight_clocks = Concurrent::Map.new
  @batch_size = batch_size
  @wait_for = TimeUnit::NANOSECONDS.convert(wait_for, TimeUnit::MILLISECONDS)
end

Instance Method Details

#add_filtered_metrics(filtered_size) ⇒ Object



117
118
119
120
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 117

def add_filtered_metrics(filtered_size)
  @event_metric_filtered.increment(filtered_size)
  @pipeline_metric_filtered.increment(filtered_size)
end

#add_output_metrics(filtered_size) ⇒ Object



122
123
124
125
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 122

def add_output_metrics(filtered_size)
  @event_metric_out.increment(filtered_size)
  @pipeline_metric_out.increment(filtered_size)
end

#closeObject



44
45
46
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 44

def close
  # noop, compat with acked queue read client
end

#close_batch(batch) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 101

def close_batch(batch)
  thread = Thread.current
  @inflight_batches.delete(thread)
  start_time = @inflight_clocks.get_and_set(thread, nil)
  unless start_time.nil?
    if batch.size > 0
      # only stop (which also records) the metrics if the batch is non-empty.
      # start_clock is now called at empty batch creation and an empty batch could
      # stay empty all the way down to the close_batch call.
      time_taken = (java.lang.System.nano_time - start_time) / 1_000_000
      @event_metric_time.increment(time_taken)
      @pipeline_metric_time.increment(time_taken)
    end
  end
end

#define_initial_metrics_values(namespaced_metric) ⇒ Object



73
74
75
76
77
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 73

def define_initial_metrics_values(namespaced_metric)
  namespaced_metric.report_time(:duration_in_millis, 0)
  namespaced_metric.increment(:filtered, 0)
  namespaced_metric.increment(:out, 0)
end

#empty?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 48

def empty?
  @queue.isEmpty
end

#inflight_batches {|@inflight_batches| ... } ⇒ Object

Yields:



79
80
81
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 79

def inflight_batches
  yield(@inflight_batches)
end

#new_batchReadBatch

create a new empty batch

Returns:

  • (ReadBatch)

    a new empty read batch



85
86
87
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 85

def new_batch
  LogStash::MemoryReadBatch.new(java.util.LinkedHashSet.new(0))
end

#read_batchObject



89
90
91
92
93
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 89

def read_batch
  batch = LogStash::MemoryReadBatch.new(LsQueueUtils.drain(@queue, @batch_size, @wait_for))
  start_metrics(batch)
  batch
end

#set_batch_dimensions(batch_size, wait_for) ⇒ Object



52
53
54
55
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 52

def set_batch_dimensions(batch_size, wait_for)
  @batch_size = batch_size
  @wait_for = TimeUnit::NANOSECONDS.convert(wait_for, TimeUnit::MILLISECONDS)
end

#set_events_metric(metric) ⇒ Object



57
58
59
60
61
62
63
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 57

def set_events_metric(metric)
  @event_metric = metric
  @event_metric_out = @event_metric.counter(:out)
  @event_metric_filtered = @event_metric.counter(:filtered)
  @event_metric_time = @event_metric.counter(:duration_in_millis)
  define_initial_metrics_values(@event_metric)
end

#set_pipeline_metric(metric) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 65

def set_pipeline_metric(metric)
  @pipeline_metric = metric
  @pipeline_metric_out = @pipeline_metric.counter(:out)
  @pipeline_metric_filtered = @pipeline_metric.counter(:filtered)
  @pipeline_metric_time = @pipeline_metric.counter(:duration_in_millis)
  define_initial_metrics_values(@pipeline_metric)
end

#start_metrics(batch) ⇒ Object



95
96
97
98
99
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 95

def start_metrics(batch)
  thread = Thread.current
  @inflight_batches[thread] = batch
  @inflight_clocks[thread] = java.lang.System.nano_time
end