Class: LogStash::Util::WrappedSynchronousQueue::ReadClient

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/util/wrapped_synchronous_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, batch_size = 125, wait_for = 250) ⇒ ReadClient

We generally only want one thread at a time able to access pop/take/poll operations from this queue. We also depend on this to be able to block consumers while we snapshot in-flight buffers



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 43

def initialize(queue, batch_size = 125, wait_for = 250)
  @queue = queue
  @mutex = Mutex.new
  # Note that @inflight_batches as a central mechanism for tracking inflight
  # batches will fail if we have multiple read clients in the pipeline.
  @inflight_batches = {}

  # allow the worker thread to report the execution time of the filter + output
  @inflight_clocks = {}
  @batch_size = batch_size
  @wait_for = wait_for
end

Instance Method Details

#add_filtered_metrics(batch) ⇒ Object



166
167
168
169
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 166

def add_filtered_metrics(batch)
  @event_metric_filtered.increment(batch.filtered_size)
  @pipeline_metric_filtered.increment(batch.filtered_size)
end

#add_output_metrics(batch) ⇒ Object



171
172
173
174
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 171

def add_output_metrics(batch)
  @event_metric_out.increment(batch.filtered_size)
  @pipeline_metric_out.increment(batch.filtered_size)
end

#closeObject



56
57
58
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 56

def close
  # noop, compat with acked queue read client
end

#close_batch(batch) ⇒ Object



137
138
139
140
141
142
143
144
145
146
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 137

def close_batch(batch)
  @mutex.lock
  begin
    # there seems to be concurrency issues with metrics, keep it in the mutex
    @inflight_batches.delete(Thread.current)
    stop_clock(batch)
  ensure
    @mutex.unlock
  end
end

#current_inflight_batchObject



100
101
102
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 100

def current_inflight_batch
  @inflight_batches.fetch(Thread.current, [])
end

#define_initial_metrics_values(namespaced_metric) ⇒ Object



85
86
87
88
89
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 85

def define_initial_metrics_values(namespaced_metric)
  namespaced_metric.report_time(:duration_in_millis, 0)
  namespaced_metric.increment(:filtered, 0)
  namespaced_metric.increment(:out, 0)
end

#empty?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 60

def empty?
  true # synchronous queue is alway empty
end

#inflight_batchesObject



91
92
93
94
95
96
97
98
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 91

def inflight_batches
  @mutex.lock
  begin
    yield(@inflight_batches)
  ensure
    @mutex.unlock
  end
end

#new_batchReadBatch

create a new empty batch

Returns:



106
107
108
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 106

def new_batch
  ReadBatch.new(@queue, @batch_size, @wait_for)
end

#read_batchObject



110
111
112
113
114
115
116
117
118
119
120
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 110

def read_batch
  batch = new_batch
  @mutex.lock
  begin
    batch.read_next
  ensure
    @mutex.unlock
  end
  start_metrics(batch)
  batch
end

#set_batch_dimensions(batch_size, wait_for) ⇒ Object



64
65
66
67
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 64

def set_batch_dimensions(batch_size, wait_for)
  @batch_size = batch_size
  @wait_for = wait_for
end

#set_current_thread_inflight_batch(batch) ⇒ Object



133
134
135
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 133

def set_current_thread_inflight_batch(batch)
  @inflight_batches[Thread.current] = batch
end

#set_events_metric(metric) ⇒ Object



69
70
71
72
73
74
75
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 69

def set_events_metric(metric)
  @event_metric = metric
  @event_metric_out = @event_metric.counter(:out)
  @event_metric_filtered = @event_metric.counter(:filtered)
  @event_metric_time = @event_metric.counter(:duration_in_millis)
  define_initial_metrics_values(@event_metric)
end

#set_pipeline_metric(metric) ⇒ Object



77
78
79
80
81
82
83
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 77

def set_pipeline_metric(metric)
  @pipeline_metric = metric
  @pipeline_metric_out = @pipeline_metric.counter(:out)
  @pipeline_metric_filtered = @pipeline_metric.counter(:filtered)
  @pipeline_metric_time = @pipeline_metric.counter(:duration_in_millis)
  define_initial_metrics_values(@pipeline_metric)
end

#start_clockObject



148
149
150
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 148

def start_clock
  @inflight_clocks[Thread.current] = java.lang.System.current_time_millis
end

#start_metrics(batch) ⇒ Object



122
123
124
125
126
127
128
129
130
131
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 122

def start_metrics(batch)
  @mutex.lock
  # there seems to be concurrency issues with metrics, keep it in the mutex
  begin
    set_current_thread_inflight_batch(batch)
    start_clock
  ensure
    @mutex.unlock
  end
end

#stop_clock(batch) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 152

def stop_clock(batch)
  unless @inflight_clocks[Thread.current].nil?
    if batch.size > 0
      # only stop (which also records) the metrics if the batch is non-empty.
      # start_clock is now called at empty batch creation and an empty batch could
      # stay empty all the way down to the close_batch call.
      time_taken = java.lang.System.current_time_millis - @inflight_clocks[Thread.current]
      @event_metric_time.increment(time_taken)
      @pipeline_metric_time.increment(time_taken)
    end
    @inflight_clocks.delete(Thread.current)
  end
end