Class: LogStash::Inputs::DeadLetterQueue

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/dead_letter_queue.rb

Overview

Logstash input to read events from Logstash’s dead letter queue

source, sh

input {

dead_letter_queue {
  path => "/var/logstash/data/dead_letter_queue"
  timestamp => "2017-04-04T23:40:37"
}

}


Instance Method Summary collapse

Instance Method Details

#registerObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 48

def register
  if @sincedb_path.nil?
    datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "dead_letter_queue", @pipeline_id)
    # Ensure that the filepath exists before writing, since it's deeply nested.
    FileUtils::mkdir_p datapath
    @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest(@path))
  elsif File.directory?(@sincedb_path)
    raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
  end

  dlq_path = java.nio.file.Paths.get(File.join(@path, @pipeline_id))
  sincedb_path = @sincedb_path ? java.nio.file.Paths.get(@sincedb_path) : nil
  start_timestamp = @start_timestamp ? org.logstash.Timestamp.new(@start_timestamp) : nil
  logstash_version = Gem::Version.new(LOGSTASH_CORE_VERSION)
  if clean_consumed && !Gem::Requirement.new('>= 8.4.0').satisfied_by?(logstash_version)
    raise LogStash::ConfigurationError.new("clean_consumed can be used only with Logstash version 8.4.0 and above")
  end
  if clean_consumed && !commit_offsets
    # clean_consumed requires the commit of offset
    raise LogStash::ConfigurationError.new("enabling clean_consumed requires commit_offsets to also be enabled")
  end
  @cleaned_metrics = metric.namespace(@pipeline_id)
  @inner_plugin = org.logstash.input.DeadLetterQueueInputPlugin.new(dlq_path, @commit_offsets, sincedb_path, start_timestamp, clean_consumed,
          lambda do |segments, events|
              # gauges is used instead of metric type because the updates that comes from the
              # DLQ reader are already absolute values and not deltas.
              @cleaned_metrics.gauge(:cleaned_segments, segments)
              @cleaned_metrics.gauge(:cleaned_events, events)
          end)
  @inner_plugin.register

  if Gem::Requirement.new('< 7.0').satisfied_by?(logstash_version)
    @event_creator = Proc.new do |entry|
      clone = entry.event.clone
      # LS 6 LogStash::Event.new accept Map not Event
      event = LogStash::Event.new(clone.getData())
      event.set("[@metadata]", clone.())
      event
    end
  else
    @event_creator = -> (entry) { LogStash::Event.new(entry.event.clone) }
  end
end

#run(logstash_queue) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 93

def run(logstash_queue)
  @inner_plugin.run(lambda do |entry|
    event = @event_creator.(entry)
    event.set("[@metadata][dead_letter_queue][plugin_type]", entry.plugin_type)
    event.set("[@metadata][dead_letter_queue][plugin_id]", entry.plugin_id)
    event.set("[@metadata][dead_letter_queue][reason]", entry.reason)
    event.set("[@metadata][dead_letter_queue][entry_time]", entry.entry_time)
    decorate(event)
    logstash_queue << event
  end)
end

#stopObject



106
107
108
# File 'lib/logstash/inputs/dead_letter_queue.rb', line 106

def stop
  @inner_plugin.close
end