Class: LogStash::Outputs::Base
Direct Known Subclasses
Boundary, Circonus, CloudWatch, Datadog, DatadogMetrics, ElasticSearch, ElasticSearchHTTP, ElasticSearchRiver, Email, Exec, File, Ganglia, Gelf, Gemfire, GoogleBigQuery, GoogleCloudStorage, GraphTastic, Graphite, HipChat, Http, Irc, Jira, Juggernaut, Librato, Loggly, Lumberjack, MetricCatcher, Mongodb, Nagios, NagiosNsca, Null, Opentsdb, PagerDuty, Pipe, RabbitMQ, Redis, Riak, Riemann, S3, SQS, Sns, SolrHTTP, Statsd, Stdout, Stomp, Syslog, Tcp, UDP, WebSocket, Xmpp, Zabbix, ZeroMQ
Constant Summary
Config::Mixin::CONFIGSORT
Instance Attribute Summary
#config, #original_params
Attributes inherited from Plugin
#logger, #params
Instance Method Summary
collapse
#config_init, included
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s
Constructor Details
#initialize(params = {}) ⇒ Base
Returns a new instance of Base.
48
49
50
51
|
# File 'lib/logstash/outputs/base.rb', line 48
def initialize(params={})
super
config_init(params)
end
|
Instance Method Details
#handle(event) ⇒ Object
85
86
87
|
# File 'lib/logstash/outputs/base.rb', line 85
def handle(event)
receive(event)
end
|
#handle_worker(event) ⇒ Object
89
90
91
|
# File 'lib/logstash/outputs/base.rb', line 89
def handle_worker(event)
@worker_queue.push(event)
end
|
#receive(event) ⇒ Object
59
60
61
|
# File 'lib/logstash/outputs/base.rb', line 59
def receive(event)
raise "#{self.class}#receive must be overidden"
end
|
#register ⇒ Object
54
55
56
|
# File 'lib/logstash/outputs/base.rb', line 54
def register
raise "#{self.class}#register must be overidden"
end
|
#worker_setup ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/logstash/outputs/base.rb', line 64
def worker_setup
return unless @workers > 1
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_threads = @workers.times do |i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}")
worker_params = params.merge("workers" => 1, "codec" => @codec.clone)
worker_plugin = self.class.new(worker_params)
worker_plugin.register
while true
event = queue.pop
worker_plugin.handle(event)
end
end
end
end
|
#workers_not_supported(message = nil) ⇒ Object
37
38
39
40
41
42
43
44
45
|
# File 'lib/logstash/outputs/base.rb', line 37
def workers_not_supported(message=nil)
return if @workers == 1
if message
@logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => self.class.config_name, :worker_count => @workers, :message => message))
else
@logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => self.class.config_name, :worker_count => @workers))
end
@workers = 1
end
|