Class: LogStash::Util::WrappedSynchronousQueue::ReadClient
- Inherits:
-
Object
- Object
- LogStash::Util::WrappedSynchronousQueue::ReadClient
- Defined in:
- lib/logstash/util/wrapped_synchronous_queue.rb
Instance Method Summary collapse
- #add_filtered_metrics(batch) ⇒ Object
- #add_output_metrics(batch) ⇒ Object
- #close ⇒ Object
- #close_batch(batch) ⇒ Object
- #current_inflight_batch ⇒ Object
- #define_initial_metrics_values(namespaced_metric) ⇒ Object
- #empty? ⇒ Boolean
- #inflight_batches ⇒ Object
-
#initialize(queue, batch_size = 125, wait_for = 250) ⇒ ReadClient
constructor
We generally only want one thread at a time able to access pop/take/poll operations from this queue.
-
#new_batch ⇒ ReadBatch
create a new empty batch.
- #read_batch ⇒ Object
- #set_batch_dimensions(batch_size, wait_for) ⇒ Object
- #set_current_thread_inflight_batch(batch) ⇒ Object
- #set_events_metric(metric) ⇒ Object
- #set_pipeline_metric(metric) ⇒ Object
- #start_clock ⇒ Object
- #start_metrics(batch) ⇒ Object
- #stop_clock(batch) ⇒ Object
Constructor Details
#initialize(queue, batch_size = 125, wait_for = 250) ⇒ ReadClient
We generally only want one thread at a time able to access pop/take/poll operations from this queue. We also depend on this to be able to block consumers while we snapshot in-flight buffers
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 43 def initialize(queue, batch_size = 125, wait_for = 250) @queue = queue @mutex = Mutex.new # Note that @inflight_batches as a central mechanism for tracking inflight # batches will fail if we have multiple read clients in the pipeline. @inflight_batches = {} # allow the worker thread to report the execution time of the filter + output @inflight_clocks = {} @batch_size = batch_size @wait_for = wait_for end |
Instance Method Details
#add_filtered_metrics(batch) ⇒ Object
166 167 168 169 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 166 def add_filtered_metrics(batch) @event_metric_filtered.increment(batch.filtered_size) @pipeline_metric_filtered.increment(batch.filtered_size) end |
#add_output_metrics(batch) ⇒ Object
171 172 173 174 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 171 def add_output_metrics(batch) @event_metric_out.increment(batch.filtered_size) @pipeline_metric_out.increment(batch.filtered_size) end |
#close ⇒ Object
56 57 58 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 56 def close # noop, compat with acked queue read client end |
#close_batch(batch) ⇒ Object
137 138 139 140 141 142 143 144 145 146 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 137 def close_batch(batch) @mutex.lock begin # there seems to be concurrency issues with metrics, keep it in the mutex @inflight_batches.delete(Thread.current) stop_clock(batch) ensure @mutex.unlock end end |
#current_inflight_batch ⇒ Object
100 101 102 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 100 def current_inflight_batch @inflight_batches.fetch(Thread.current, []) end |
#define_initial_metrics_values(namespaced_metric) ⇒ Object
85 86 87 88 89 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 85 def define_initial_metrics_values(namespaced_metric) namespaced_metric.report_time(:duration_in_millis, 0) namespaced_metric.increment(:filtered, 0) namespaced_metric.increment(:out, 0) end |
#empty? ⇒ Boolean
60 61 62 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 60 def empty? true # synchronous queue is alway empty end |
#inflight_batches ⇒ Object
91 92 93 94 95 96 97 98 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 91 def inflight_batches @mutex.lock begin yield(@inflight_batches) ensure @mutex.unlock end end |
#new_batch ⇒ ReadBatch
create a new empty batch
106 107 108 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 106 def new_batch ReadBatch.new(@queue, @batch_size, @wait_for) end |
#read_batch ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 110 def read_batch batch = new_batch @mutex.lock begin batch.read_next ensure @mutex.unlock end start_metrics(batch) batch end |
#set_batch_dimensions(batch_size, wait_for) ⇒ Object
64 65 66 67 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 64 def set_batch_dimensions(batch_size, wait_for) @batch_size = batch_size @wait_for = wait_for end |
#set_current_thread_inflight_batch(batch) ⇒ Object
133 134 135 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 133 def set_current_thread_inflight_batch(batch) @inflight_batches[Thread.current] = batch end |
#set_events_metric(metric) ⇒ Object
69 70 71 72 73 74 75 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 69 def set_events_metric(metric) @event_metric = metric @event_metric_out = @event_metric.counter(:out) @event_metric_filtered = @event_metric.counter(:filtered) @event_metric_time = @event_metric.counter(:duration_in_millis) define_initial_metrics_values(@event_metric) end |
#set_pipeline_metric(metric) ⇒ Object
77 78 79 80 81 82 83 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 77 def set_pipeline_metric(metric) @pipeline_metric = metric @pipeline_metric_out = @pipeline_metric.counter(:out) @pipeline_metric_filtered = @pipeline_metric.counter(:filtered) @pipeline_metric_time = @pipeline_metric.counter(:duration_in_millis) define_initial_metrics_values(@pipeline_metric) end |
#start_clock ⇒ Object
148 149 150 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 148 def start_clock @inflight_clocks[Thread.current] = java.lang.System.current_time_millis end |
#start_metrics(batch) ⇒ Object
122 123 124 125 126 127 128 129 130 131 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 122 def start_metrics(batch) @mutex.lock # there seems to be concurrency issues with metrics, keep it in the mutex begin set_current_thread_inflight_batch(batch) start_clock ensure @mutex.unlock end end |
#stop_clock(batch) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/logstash/util/wrapped_synchronous_queue.rb', line 152 def stop_clock(batch) unless @inflight_clocks[Thread.current].nil? if batch.size > 0 # only stop (which also records) the metrics if the batch is non-empty. # start_clock is now called at empty batch creation and an empty batch could # stay empty all the way down to the close_batch call. time_taken = java.lang.System.current_time_millis - @inflight_clocks[Thread.current] @event_metric_time.increment(time_taken) @pipeline_metric_time.increment(time_taken) end @inflight_clocks.delete(Thread.current) end end |