Class: LogStash::Agent

Inherits:
Clamp::Command
  • Object
show all
Defined in:
lib/logstash/agent.rb

Instance Method Summary collapse

Instance Method Details

#configureObject

Do any start-time configuration.

Log file stuff, plugin path checking, etc.



205
206
207
# File 'lib/logstash/agent.rb', line 205

def configure
  configure_logging(log_file)
end

#configure_logging(path) ⇒ Object

Point logging at a specific path.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/logstash/agent.rb', line 210

def configure_logging(path)
  # Set with the -v (or -vv...) flag
  if quiet?
    @logger.level = :error
  elsif verbose?
    @logger.level = :info
  elsif debug?
    @logger.level = :debug
  else
    # Old support for the -v and -vv stuff.
    if verbosity? && verbosity?.any?
      # this is an array with length of how many times the flag is given
      if verbosity?.length == 1
        @logger.warn("The -v flag is deprecated and will be removed in a future release. You should use --verbose instead.")
        @logger.level = :info
      else
        @logger.warn("The -vv flag is deprecated and will be removed in a future release. You should use --debug instead.")
        @logger.level = :debug
      end
    else
      @logger.level = :warn
    end

  end

  if log_file
    # TODO(sissel): Implement file output/rotation in Cabin.
    # TODO(sissel): Catch exceptions, report sane errors.
    begin
      @log_fd.close if @log_fd
      @log_fd = File.new(path, "a")
    rescue => e
      fail(I18n.t("logstash.agent.configuration.log_file_failed",
                  :path => path, :error => e))
    end

    puts "Sending logstash logs to #{path}."
    @logger.unsubscribe(@logger_subscription) if @logger_subscription
    @logger_subscription = @logger.subscribe(@log_fd)
  else
    @logger.subscribe(STDOUT)
  end

  # TODO(sissel): redirect stdout/stderr to the log as well
  # http://jira.codehaus.org/browse/JRUBY-7003
end

#executeObject

Run the agent. This method is invoked after clamp parses the flags given to this program.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/logstash/agent.rb', line 65

def execute
  require "logstash/pipeline"
  require "cabin" # gem 'cabin'
  require "logstash/plugin"
  @logger = Cabin::Channel.get(LogStash)

  if version?
    show_version
    return 0
  end

  # temporarily send logs to stdout as well if a --log is specified
  # and stdout appears to be a tty
  show_startup_errors = log_file && STDOUT.tty?

  if show_startup_errors
    stdout_logs = @logger.subscribe(STDOUT)
  end
  configure

  # You must specify a config_string or config_path
  if @config_string.nil? && @config_path.nil?
    fail(help + "\n" + I18n.t("logstash.agent.missing-configuration"))
  end

  @config_string = @config_string.to_s

  if @config_path
    # Append the config string.
    # This allows users to provide both -f and -e flags. The combination
    # is rare, but useful for debugging.
    @config_string = @config_string + load_config(@config_path)
  else
    # include a default stdin input if no inputs given
    if @config_string !~ /input *{/
      @config_string += "input { stdin { type => stdin } }"
    end
    # include a default stdout output if no outputs given
    if @config_string !~ /output *{/
      @config_string += "output { stdout { codec => rubydebug } }"
    end
  end

  begin
    pipeline = LogStash::Pipeline.new(@config_string)
  rescue LoadError => e
    fail("Configuration problem.")
  end

  # Make SIGINT shutdown the pipeline.
  sigint_id = Stud::trap("INT") do

    if @interrupted_once
      @logger.fatal(I18n.t("logstash.agent.forced_sigint"))
      exit
    else
      @logger.warn(I18n.t("logstash.agent.sigint"))
      Thread.new(@logger) {|logger| sleep 5; logger.warn(I18n.t("logstash.agent.slow_shutdown")) }
      @interrupted_once = true
      pipeline.shutdown
    end
  end

  # Make SIGTERM shutdown the pipeline.
  sigterm_id = Stud::trap("TERM") do
    @logger.warn(I18n.t("logstash.agent.sigterm"))
    pipeline.shutdown
  end

  Stud::trap("HUP") do
    @logger.info(I18n.t("logstash.agent.sighup"))
    configure_logging(log_file)
  end

  pipeline.configure("filter-workers", filter_workers)

  # Stop now if we are only asking for a config test.
  if config_test?
    report "Configuration OK"
    return
  end

  @logger.unsubscribe(stdout_logs) if show_startup_errors

  # TODO(sissel): Get pipeline completion status.
  pipeline.run
  return 0
rescue LogStash::ConfigurationError => e
  @logger.unsubscribe(stdout_logs) if show_startup_errors
  report I18n.t("logstash.agent.error", :error => e)
  if !config_test?
    report I18n.t("logstash.agent.configtest-flag-information")
  end
  return 1
rescue => e
  @logger.unsubscribe(stdout_logs) if show_startup_errors
  report I18n.t("oops", :error => e)
  report e.backtrace if @logger.debug? || $DEBUGLIST.include?("stacktrace")
  return 1
ensure
  @log_fd.close if @log_fd
  Stud::untrap("INT", sigint_id) unless sigint_id.nil?
  Stud::untrap("TERM", sigterm_id) unless sigterm_id.nil?
end

#fail(message) ⇒ Object

Emit a failure message and abort.



53
54
55
# File 'lib/logstash/agent.rb', line 53

def fail(message)
  raise LogStash::ConfigurationError, message
end

#fetch_config(uri) ⇒ Object

def load_config



308
309
310
311
312
313
314
# File 'lib/logstash/agent.rb', line 308

def fetch_config(uri)
  begin
    Net::HTTP.get(uri) + "\n"
  rescue Exception => e
    fail(I18n.t("logstash.agent.configuration.fetch-failed", :path => uri.to_s, :message => e.message))
  end
end

#load_config(path) ⇒ Object

def configure_logging



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/logstash/agent.rb', line 257

def load_config(path)
  begin
    uri = URI.parse(path)

    case uri.scheme
    when nil then
      local_config(path)
    when /http/ then
      fetch_config(uri)
    when "file" then
      local_config(uri.path)
    else
      fail(I18n.t("logstash.agent.configuration.scheme-not-supported", :path => path))
    end
  rescue URI::InvalidURIError
    # fallback for windows.
    # if the parsing of the file failed we assume we can reach it locally.
    # some relative path on windows arent parsed correctly (.\logstash.conf)
    local_config(path)
  end
end

#local_config(path) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/logstash/agent.rb', line 279

def local_config(path)
  path = File.expand_path(path)
  path = File.join(path, "*") if File.directory?(path)

  if Dir.glob(path).length == 0
    fail(I18n.t("logstash.agent.configuration.file-not-found", :path => path))
  end

  config = ""
  encoding_issue_files = []
  Dir.glob(path).sort.each do |file|
    next unless File.file?(file)
    if file.match(/~$/)
      @logger.debug("NOT reading config file because it is a temp file", :file => file)
      next
    end
    @logger.debug("Reading config file", :file => file)
    cfg = File.read(file)
    if !cfg.ascii_only? && !cfg.valid_encoding?
      encoding_issue_files << file
    end
    config << cfg + "\n"
  end
  if (encoding_issue_files.any?)
    fail("The following config files contains non-ascii characters but are not UTF-8 encoded #{encoding_issue_files}")
  end
  return config
end

#report(message) ⇒ Object

def fail



57
58
59
60
61
# File 'lib/logstash/agent.rb', line 57

def report(message)
  # Print to stdout just in case we're logging to a file
  puts message
  @logger.log(message) if log_file
end

#show_gemsObject

def show_version_java



195
196
197
198
199
200
# File 'lib/logstash/agent.rb', line 195

def show_gems
  require "rubygems"
  Gem::Specification.each do |spec|
    puts "gem #{spec.name} #{spec.version}"
  end
end

#show_versionObject

def execute



170
171
172
173
174
175
176
177
178
# File 'lib/logstash/agent.rb', line 170

def show_version
  show_version_logstash

  if [:info, :debug].include?(verbosity?) || debug? || verbose?
    show_version_ruby
    show_version_java if LogStash::Environment.jruby?
    show_gems if [:debug].include?(verbosity?) || debug?
  end
end

#show_version_javaObject

def show_version_ruby



189
190
191
192
193
# File 'lib/logstash/agent.rb', line 189

def show_version_java
  properties = java.lang.System.getProperties
  puts "java #{properties["java.version"]} (#{properties["java.vendor"]})"
  puts "jvm #{properties["java.vm.name"]} / #{properties["java.vm.version"]}"
end

#show_version_logstashObject

def show_version



180
181
182
183
# File 'lib/logstash/agent.rb', line 180

def show_version_logstash
  require "logstash/version"
  puts "logstash #{LOGSTASH_VERSION}"
end

#show_version_rubyObject

def show_version_logstash



185
186
187
# File 'lib/logstash/agent.rb', line 185

def show_version_ruby
  puts RUBY_DESCRIPTION
end

#warn(message) ⇒ Object

Emit a warning message.



47
48
49
50
# File 'lib/logstash/agent.rb', line 47

def warn(message)
  # For now, all warnings are fatal.
  raise LogStash::ConfigurationError, message
end