Class: LogStash::OutputDelegator
- Inherits:
-
Object
- Object
- LogStash::OutputDelegator
- Defined in:
- lib/logstash/output_delegator.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#threadsafe ⇒ Object
readonly
Returns the value of attribute threadsafe.
-
#worker_count ⇒ Object
readonly
Returns the value of attribute worker_count.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#busy_workers ⇒ Object
There’s no concept of ‘busy’ workers for a threadsafe plugin!.
- #calculate_worker_count(default_worker_count) ⇒ Object
- #config_name ⇒ Object
- #do_close ⇒ Object
- #events_received ⇒ Object
-
#initialize(logger, klass, default_worker_count, *args) ⇒ OutputDelegator
constructor
The *args this takes are the same format that a Outputs::Base takes.
- #register ⇒ Object
- #threadsafe? ⇒ Boolean
- #threadsafe_multi_receive(events) ⇒ Object
- #warn_on_worker_override! ⇒ Object
- #worker_limits_overriden? ⇒ Boolean
- #worker_multi_receive(events) ⇒ Object
Constructor Details
#initialize(logger, klass, default_worker_count, *args) ⇒ OutputDelegator
The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them Internally these just get merged together into a single hash
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/logstash/output_delegator.rb', line 15 def initialize(logger, klass, default_worker_count, *args) @logger = logger @threadsafe = klass.threadsafe? @config = args.reduce({}, :merge) @klass = klass # We define this as an array regardless of threadsafety # to make reporting simpler, even though a threadsafe plugin will just have # a single instance # # Older plugins invoke the instance method Outputs::Base#workers_not_supported # To detect these we need an instance to be created first :() # TODO: In the next major version after 2.x remove support for this @workers = [@klass.new(*args)] @workers.first.register # Needed in case register calls `workers_not_supported` # DO NOT move this statement before the instantiation of the first single instance # Read the note above to understand why @worker_count = calculate_worker_count(default_worker_count) @logger.debug("Will start workers for output", :worker_count => @worker_count, :class => klass) warn_on_worker_override! # This queue is used to manage sharing across threads @worker_queue = SizedQueue.new(@worker_count) @workers += (@worker_count - 1).times.map do inst = @klass.new(*args) inst.register inst end @workers.each { |w| @worker_queue << w } @events_received = Concurrent::AtomicFixnum.new(0) # One might wonder why we don't use something like # define_singleton_method(:multi_receive, method(:threadsafe_multi_receive) # and the answer is this is buggy on Jruby 1.7.x . It works 98% of the time! # The other 2% you get weird errors about rebinding to the same object # Until we switch to Jruby 9.x keep the define_singleton_method parts # the way they are, with a block # See https://github.com/jruby/jruby/issues/3582 if threadsafe? @threadsafe_worker = @workers.first define_singleton_method(:multi_receive) do |events| threadsafe_multi_receive(events) end else define_singleton_method(:multi_receive) do |events| worker_multi_receive(events) end end end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
11 12 13 |
# File 'lib/logstash/output_delegator.rb', line 11 def config @config end |
#threadsafe ⇒ Object (readonly)
Returns the value of attribute threadsafe.
11 12 13 |
# File 'lib/logstash/output_delegator.rb', line 11 def threadsafe @threadsafe end |
#worker_count ⇒ Object (readonly)
Returns the value of attribute worker_count.
11 12 13 |
# File 'lib/logstash/output_delegator.rb', line 11 def worker_count @worker_count end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
11 12 13 |
# File 'lib/logstash/output_delegator.rb', line 11 def workers @workers end |
Instance Method Details
#busy_workers ⇒ Object
There’s no concept of ‘busy’ workers for a threadsafe plugin!
139 140 141 142 143 144 145 |
# File 'lib/logstash/output_delegator.rb', line 139 def busy_workers if @threadsafe 0 else @workers.size - @worker_queue.size end end |
#calculate_worker_count(default_worker_count) ⇒ Object
92 93 94 95 96 97 98 |
# File 'lib/logstash/output_delegator.rb', line 92 def calculate_worker_count(default_worker_count) if @threadsafe || @klass.workers_not_supported? 1 else @config["workers"] || default_worker_count end end |
#config_name ⇒ Object
100 101 102 |
# File 'lib/logstash/output_delegator.rb', line 100 def config_name @klass.config_name end |
#do_close ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/logstash/output_delegator.rb', line 125 def do_close @logger.debug("closing output delegator", :klass => self) @worker_count.times do worker = @worker_queue.pop worker.do_close end end |
#events_received ⇒ Object
134 135 136 |
# File 'lib/logstash/output_delegator.rb', line 134 def events_received @events_received.value end |
#register ⇒ Object
104 105 106 |
# File 'lib/logstash/output_delegator.rb', line 104 def register @workers.each {|w| w.register} end |
#threadsafe? ⇒ Boolean
70 71 72 |
# File 'lib/logstash/output_delegator.rb', line 70 def threadsafe? !!@threadsafe end |
#threadsafe_multi_receive(events) ⇒ Object
108 109 110 111 112 |
# File 'lib/logstash/output_delegator.rb', line 108 def threadsafe_multi_receive(events) @events_received.increment(events.length) @threadsafe_worker.multi_receive(events) end |
#warn_on_worker_override! ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/logstash/output_delegator.rb', line 74 def warn_on_worker_override! # The user has configured extra workers, but this plugin doesn't support it :( if worker_limits_overriden? = @klass. = {:plugin => @klass.config_name, :worker_count => @config["workers"]} if [:message] = @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", )) else @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", )) end end end |
#worker_limits_overriden? ⇒ Boolean
88 89 90 |
# File 'lib/logstash/output_delegator.rb', line 88 def worker_limits_overriden? @config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported? end |
#worker_multi_receive(events) ⇒ Object
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/logstash/output_delegator.rb', line 114 def worker_multi_receive(events) @events_received.increment(events.length) worker = @worker_queue.pop begin worker.multi_receive(events) ensure @worker_queue.push(worker) end end |