Class: LogStash::Outputs::Base
- Includes:
- Config::Mixin
- Defined in:
- lib/logstash/outputs/base.rb
Constant Summary
Constants included from Config::Mixin
Config::Mixin::CONFIGSORT, Config::Mixin::PLUGIN_VERSION_0_9_0, Config::Mixin::PLUGIN_VERSION_1_0_0
Constants inherited from Plugin
Instance Attribute Summary collapse
-
#worker_plugins ⇒ Object
readonly
Returns the value of attribute worker_plugins.
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
- #handle(event) ⇒ Object
-
#handle_worker(event) ⇒ Object
def handle.
-
#initialize(params = {}) ⇒ Base
constructor
A new instance of Base.
- #receive(event) ⇒ Object
- #register ⇒ Object
- #worker_setup ⇒ Object
- #workers_not_supported(message = nil) ⇒ Object
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s
Constructor Details
#initialize(params = {}) ⇒ Base
Returns a new instance of Base.
50 51 52 53 |
# File 'lib/logstash/outputs/base.rb', line 50 def initialize(params={}) super config_init(params) end |
Instance Attribute Details
#worker_plugins ⇒ Object (readonly)
Returns the value of attribute worker_plugins.
36 37 38 |
# File 'lib/logstash/outputs/base.rb', line 36 def worker_plugins @worker_plugins end |
Instance Method Details
#handle(event) ⇒ Object
87 88 89 |
# File 'lib/logstash/outputs/base.rb', line 87 def handle(event) receive(event) end |
#handle_worker(event) ⇒ Object
def handle
91 92 93 |
# File 'lib/logstash/outputs/base.rb', line 91 def handle_worker(event) @worker_queue.push(event) end |
#receive(event) ⇒ Object
61 62 63 |
# File 'lib/logstash/outputs/base.rb', line 61 def receive(event) raise "#{self.class}#receive must be overidden" end |
#register ⇒ Object
56 57 58 |
# File 'lib/logstash/outputs/base.rb', line 56 def register raise "#{self.class}#register must be overidden" end |
#worker_setup ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/logstash/outputs/base.rb', line 66 def worker_setup if @workers == 1 @worker_plugins = [self] else define_singleton_method(:handle, method(:handle_worker)) @worker_queue = SizedQueue.new(20) @worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) } @worker_plugins.map.with_index do |plugin, i| Thread.new(original_params, @worker_queue) do |params, queue| LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}") plugin.register while true event = queue.pop plugin.handle(event) end end end end end |
#workers_not_supported(message = nil) ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/logstash/outputs/base.rb', line 39 def workers_not_supported(=nil) return if @workers == 1 if @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => self.class.config_name, :worker_count => @workers, :message => )) else @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => self.class.config_name, :worker_count => @workers)) end @workers = 1 end |