Class: LogStash::Inputs::DeadLetterQueue
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::DeadLetterQueue
- Defined in:
- lib/logstash/inputs/dead_letter_queue.rb
Overview
Logstash input to read events from Logstash’s dead letter queue
- source, sh
input {
dead_letter_queue { path => "/var/logstash/data/dead_letter_queue" => "2017-04-04T23:40:37" }
}
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 48 def register if @sincedb_path.nil? datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "dead_letter_queue", @pipeline_id) # Ensure that the filepath exists before writing, since it's deeply nested. FileUtils::mkdir_p datapath @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest(@path)) elsif File.directory?(@sincedb_path) raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") end dlq_path = java.nio.file.Paths.get(File.join(@path, @pipeline_id)) sincedb_path = @sincedb_path ? java.nio.file.Paths.get(@sincedb_path) : nil = @start_timestamp ? org.logstash.Timestamp.new(@start_timestamp) : nil logstash_version = Gem::Version.new(LOGSTASH_CORE_VERSION) if clean_consumed && !Gem::Requirement.new('>= 8.4.0').satisfied_by?(logstash_version) raise LogStash::ConfigurationError.new("clean_consumed can be used only with Logstash version 8.4.0 and above") end if clean_consumed && !commit_offsets # clean_consumed requires the commit of offset raise LogStash::ConfigurationError.new("enabling clean_consumed requires commit_offsets to also be enabled") end @cleaned_metrics = metric.namespace(@pipeline_id) @inner_plugin = org.logstash.input.DeadLetterQueueInputPlugin.new(dlq_path, @commit_offsets, sincedb_path, , clean_consumed, lambda do |segments, events| # gauges is used instead of metric type because the updates that comes from the # DLQ reader are already absolute values and not deltas. @cleaned_metrics.gauge(:cleaned_segments, segments) @cleaned_metrics.gauge(:cleaned_events, events) end) @inner_plugin.register if Gem::Requirement.new('< 7.0').satisfied_by?(logstash_version) @event_creator = Proc.new do |entry| clone = entry.event.clone # LS 6 LogStash::Event.new accept Map not Event event = LogStash::Event.new(clone.getData()) event.set("[@metadata]", clone.getMetadata()) event end else @event_creator = -> (entry) { LogStash::Event.new(entry.event.clone) } end end |
#run(logstash_queue) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 93 def run(logstash_queue) @inner_plugin.run(lambda do |entry| event = @event_creator.(entry) event.set("[@metadata][dead_letter_queue][plugin_type]", entry.plugin_type) event.set("[@metadata][dead_letter_queue][plugin_id]", entry.plugin_id) event.set("[@metadata][dead_letter_queue][reason]", entry.reason) event.set("[@metadata][dead_letter_queue][entry_time]", entry.entry_time) decorate(event) logstash_queue << event end) end |
#stop ⇒ Object
106 107 108 |
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 106 def stop @inner_plugin.close end |