Class: LogStash::Outputs::Base

Inherits:
Plugin
  • Object
show all
Includes:
Config::Mixin
Defined in:
lib/logstash/outputs/base.rb

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

#initialize(params = {}) ⇒ Base

Returns a new instance of Base.



48
49
50
51
# File 'lib/logstash/outputs/base.rb', line 48

def initialize(params={})
  super
  config_init(params)
end

Instance Method Details

#handle(event) ⇒ Object



85
86
87
# File 'lib/logstash/outputs/base.rb', line 85

def handle(event)
  receive(event)
end

#handle_worker(event) ⇒ Object

def handle



89
90
91
# File 'lib/logstash/outputs/base.rb', line 89

def handle_worker(event)
  @worker_queue.push(event)
end

#receive(event) ⇒ Object



59
60
61
# File 'lib/logstash/outputs/base.rb', line 59

def receive(event)
  raise "#{self.class}#receive must be overidden"
end

#registerObject



54
55
56
# File 'lib/logstash/outputs/base.rb', line 54

def register
  raise "#{self.class}#register must be overidden"
end

#worker_setupObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/logstash/outputs/base.rb', line 64

def worker_setup
  return unless @workers > 1

  define_singleton_method(:handle, method(:handle_worker))
  @worker_queue = SizedQueue.new(20)

  @worker_threads = @workers.times do |i|
    Thread.new(original_params, @worker_queue) do |params, queue|
      LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}")
      worker_params = params.merge("workers" => 1, "codec" => @codec.clone)
      worker_plugin = self.class.new(worker_params)
      worker_plugin.register
      while true
        event = queue.pop
        worker_plugin.handle(event)
      end
    end
  end
end

#workers_not_supported(message = nil) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/logstash/outputs/base.rb', line 37

def workers_not_supported(message=nil)
  return if @workers == 1
  if message
    @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => self.class.config_name, :worker_count => @workers, :message => message))
  else
    @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => self.class.config_name, :worker_count => @workers))
  end
  @workers = 1
end