Class: LogStash::Agent

Inherits:
Clamp::Command
  • Object
show all
Defined in:
lib/logstash/agent.rb

Instance Method Summary collapse

Instance Method Details

#configureObject

Do any start-time configuration.

Log file stuff, plugin path checking, etc.



205
206
207
208
# File 'lib/logstash/agent.rb', line 205

def configure
  configure_logging(log_file)
  configure_plugin_path(plugin_paths) if !plugin_paths.nil?
end

#configure_logging(path) ⇒ Object

Point logging at a specific path.



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/logstash/agent.rb', line 211

def configure_logging(path)
  # Set with the -v (or -vv...) flag
  if quiet?
    @logger.level = :error
  elsif verbose?
    @logger.level = :info
  elsif debug?
    @logger.level = :debug
  else
    # Old support for the -v and -vv stuff.
    if verbosity? && verbosity?.any?
      # this is an array with length of how many times the flag is given
      if verbosity?.length == 1
        @logger.warn("The -v flag is deprecated and will be removed in a future release. You should use --verbose instead.")
        @logger.level = :info
      else
        @logger.warn("The -vv flag is deprecated and will be removed in a future release. You should use --debug instead.")
        @logger.level = :debug
      end
    else
      @logger.level = :warn
    end

  end

  if log_file
    # TODO(sissel): Implement file output/rotation in Cabin.
    # TODO(sissel): Catch exceptions, report sane errors.
    begin
      @log_fd.close if @log_fd
      @log_fd = File.new(path, "a")
    rescue => e
      fail(I18n.t("logstash.agent.configuration.log_file_failed",
                  :path => path, :error => e))
    end

    puts "Sending logstash logs to #{path}."
    @logger.unsubscribe(@logger_subscription) if @logger_subscription
    @logger_subscription = @logger.subscribe(@log_fd)
  else
    @logger.subscribe(STDOUT)
  end

  # TODO(sissel): redirect stdout/stderr to the log as well
  # http://jira.codehaus.org/browse/JRUBY-7003
end

#configure_plugin_path(paths) ⇒ Object

Validate and add any paths to the list of locations logstash will look to find plugins.



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/logstash/agent.rb', line 260

def configure_plugin_path(paths)
  # Append any plugin paths to the ruby search path
  paths.each do |path|
    # Verify the path exists
    if !Dir.exists?(path)
      warn(I18n.t("logstash.agent.configuration.plugin_path_missing",
                  :path => path))

    end

    # TODO(sissel): Verify the path looks like the correct form.
    # aka, there must be file in path/logstash/{filters,inputs,outputs}/*.rb
    plugin_glob = File.join(path, "logstash", "{inputs,filters,outputs}", "*.rb")
    if Dir.glob(plugin_glob).empty?
      warn(I18n.t("logstash.agent.configuration.no_plugins_found",
                  :path => path, :plugin_glob => plugin_glob))
    end

    # We push plugin paths to the front of the LOAD_PATH so that folks
    # can override any core logstash plugins if they need to.
    @logger.debug("Adding plugin path", :path => path)
    $LOAD_PATH.unshift(path)
  end
end

#executeObject

Run the agent. This method is invoked after clamp parses the flags given to this program.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/logstash/agent.rb', line 67

def execute
  require "logstash/pipeline"
  require "cabin" # gem 'cabin'
  require "logstash/plugin"
  @logger = Cabin::Channel.get(LogStash)

  if version?
    show_version
    return 0
  end

  # temporarily send logs to stdout as well if a --log is specified
  # and stdout appears to be a tty
  show_startup_errors = log_file && STDOUT.tty?

  if show_startup_errors
    stdout_logs = @logger.subscribe(STDOUT)
  end
  configure

  # You must specify a config_string or config_path
  if config_string.nil? && config_path.nil?
    fail(help + "\n" + I18n.t("logstash.agent.missing-configuration"))
  end

  if @config_path
    @config_string = load_config(@config_path)
  else
    # include a default stdin input if no inputs given
    if @config_string !~ /input *{/
      @config_string += "input { stdin { type => stdin } }"
    end
    # include a default stdout output if no outputs given
    if @config_string !~ /output *{/
      @config_string += "output { stdout { codec => rubydebug } }"
    end
  end

  begin
    pipeline = LogStash::Pipeline.new(@config_string)
  rescue LoadError => e
    fail("Configuration problem.")
  end

  # Stop now if we are only asking for a config test.
  if config_test?
    report "Configuration OK"
    return
  end

  # Make SIGINT shutdown the pipeline.
  trap_id = Stud::trap("INT") do
    @logger.warn(I18n.t("logstash.agent.interrupted"))
    pipeline.shutdown
  end

  Stud::trap("HUP") do
    @logger.info(I18n.t("logstash.agent.sighup"))
    configure_logging(log_file)
  end

  pipeline.configure("filter-workers", filter_workers)

  @logger.unsubscribe(stdout_logs) if show_startup_errors

  # TODO(sissel): Get pipeline completion status.
  pipeline.run
  return 0
rescue LogStash::ConfigurationError => e
  @logger.unsubscribe(stdout_logs) if show_startup_errors
  report I18n.t("logstash.agent.error", :error => e)
  return 1
rescue => e
  @logger.unsubscribe(stdout_logs) if show_startup_errors
  report I18n.t("oops", :error => e)
  report e.backtrace if @logger.debug? || $DEBUGLIST.include?("stacktrace")
  return 1
ensure
  @log_fd.close if @log_fd
  Stud::untrap("INT", trap_id) unless trap_id.nil?
end

#fail(message) ⇒ Object

Emit a failure message and abort.



55
56
57
# File 'lib/logstash/agent.rb', line 55

def fail(message)
  raise LogStash::ConfigurationError, message
end

#load_config(path) ⇒ Object

def configure_plugin_path



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/logstash/agent.rb', line 285

def load_config(path)
  path = File.join(path, "*") if File.directory?(path)

  if Dir.glob(path).length == 0
    fail(I18n.t("logstash.agent.configuration.file-not-found", :path => path))
  end

  config = ""
  Dir.glob(path).sort.each do |file|
    next unless File.file?(file)
    if file.match(/~$/)
      @logger.debug("NOT reading config file because it is a temp file", :file => file)
      next
    end
    @logger.debug("Reading config file", :file => file)
    config << File.read(file) + "\n"
  end
  return config
end

#report(message) ⇒ Object

def fail



59
60
61
62
63
# File 'lib/logstash/agent.rb', line 59

def report(message)
  # Print to stdout just in case we're logging to a file
  puts message
  @logger.log(message) if log_file
end

#show_gemsObject

def show_version_java



195
196
197
198
199
200
# File 'lib/logstash/agent.rb', line 195

def show_gems
  require "rubygems"
  Gem::Specification.each do |spec|
    puts "gem #{spec.name} #{spec.version}"
  end
end

#show_versionObject

def execute



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/logstash/agent.rb', line 149

def show_version
  show_version_logstash

  if [:info, :debug].include?(verbosity?) || debug? || verbose?
    show_version_ruby

    if RUBY_PLATFORM == "java"
      show_version_java
      show_version_elasticsearch
    end

    if [:debug].include?(verbosity?) || debug?
      show_gems 
    end
  end
end

#show_version_elasticsearchObject

def show_version_ruby



175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/logstash/agent.rb', line 175

def show_version_elasticsearch
  # Not running in the,jar? assume elasticsearch jars are
  # in ../../vendor/jar/...
  if __FILE__ !~ /^(?:jar:)?file:/
    jarpath = File.join(File.dirname(__FILE__), "../../vendor/jar/elasticsearch*/lib/*.jar")
    Dir.glob(jarpath).each do |jar|
      require jar
    end
  end

  $stdout.write("Elasticsearch: ");
  org.elasticsearch.Version::main([])
end

#show_version_javaObject

def show_version_elasticsearch



189
190
191
192
193
# File 'lib/logstash/agent.rb', line 189

def show_version_java
  properties = java.lang.System.getProperties
  puts "java #{properties["java.version"]} (#{properties["java.vendor"]})"
  puts "jvm #{properties["java.vm.name"]} / #{properties["java.vm.version"]}"
end

#show_version_logstashObject

def show_version



166
167
168
169
# File 'lib/logstash/agent.rb', line 166

def show_version_logstash
  require "logstash/version"
  puts "logstash #{LOGSTASH_VERSION}"
end

#show_version_rubyObject

def show_version_logstash



171
172
173
# File 'lib/logstash/agent.rb', line 171

def show_version_ruby
  puts RUBY_DESCRIPTION
end

#warn(message) ⇒ Object

Emit a warning message.



49
50
51
52
# File 'lib/logstash/agent.rb', line 49

def warn(message)
  # For now, all warnings are fatal.
  raise LogStash::ConfigurationError, message
end