Class: LogStash::Inputs::Elasticsearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Elasticsearch
- Extended by:
- PositiveWholeNumberValidator, URIOrEmptyValidator, PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
- Includes:
- PluginMixins::CATrustedFingerprintSupport, PluginMixins::ECSCompatibilitySupport::TargetCheck, PluginMixins::EventSupport::EventFactoryAdapter, PluginMixins::NormalizeConfigSupport, PluginMixins::Scheduler
- Defined in:
- lib/logstash/inputs/elasticsearch.rb,
lib/logstash/inputs/elasticsearch/aggregation.rb,
lib/logstash/inputs/elasticsearch/paginated_search.rb
Overview
.Compatibility Note
- NOTE
-
Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.
Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).
Example:
- source,ruby
-
input {
# Read all documents from Elasticsearch matching the given query elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }' }
}
This would create an Elasticsearch query with the following format:
- source,json
-
curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{
"query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ]
}‘
Scheduling
Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
Examples:
|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================
Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
Defined Under Namespace
Modules: PositiveWholeNumberValidator, URIOrEmptyValidator Classes: Aggregation, PaginatedSearch, Scroll, SearchAfter
Constant Summary collapse
- BUILD_FLAVOR_SERVERLESS =
'serverless'.freeze
- DEFAULT_EAV_HEADER =
{ "Elastic-Api-Version" => "2023-10-31" }.freeze
Instance Attribute Summary collapse
-
#pipeline_id ⇒ Object
readonly
Returns the value of attribute pipeline_id.
Instance Method Summary collapse
-
#initialize(params = {}) ⇒ Elasticsearch
constructor
A new instance of Elasticsearch.
- #push_hit(hit, output_queue, root_field = '_source') ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #set_docinfo_fields(hit, event) ⇒ Object
Methods included from URIOrEmptyValidator
Methods included from PositiveWholeNumberValidator
Constructor Details
#initialize(params = {}) ⇒ Elasticsearch
Returns a new instance of Elasticsearch.
275 276 277 278 279 280 281 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 275 def initialize(params={}) super(params) if docinfo_target.nil? @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]'] end end |
Instance Attribute Details
#pipeline_id ⇒ Object (readonly)
Returns the value of attribute pipeline_id.
270 271 272 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 270 def pipeline_id @pipeline_id end |
Instance Method Details
#push_hit(hit, output_queue, root_field = '_source') ⇒ Object
349 350 351 352 353 354 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 349 def push_hit(hit, output_queue, root_field = '_source') event = targeted_event_factory.new_event hit[root_field] set_docinfo_fields(hit, event) if @docinfo decorate(event) output_queue << event end |
#register ⇒ Object
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 283 def register require "rufus/scheduler" @pipeline_id = execution_context&.pipeline_id || 'main' fill_hosts_from_cloud_id setup_ssl_params! @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") validate_authentication fill_user_password_from_cloud_auth = {:headers => {}} [:headers].merge!(setup_basic_auth(user, password)) [:headers].merge!(setup_api_key(api_key)) [:headers].merge!({'user-agent' => prepare_user_agent()}) [:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil? [:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil? [:socket_timeout] = @socket_timeout_seconds unless @socket_timeout_seconds.nil? hosts = setup_hosts = setup_client_ssl @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('') [:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('') @client_options = { :hosts => hosts, :transport_options => , :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore, :ssl => } @client = Elasticsearch::Client.new(@client_options) test_connection! setup_serverless setup_search_api setup_query_executor @client end |
#run(output_queue) ⇒ Object
337 338 339 340 341 342 343 344 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 337 def run(output_queue) if @schedule scheduler.cron(@schedule) { @query_executor.do_run(output_queue) } scheduler.join else @query_executor.do_run(output_queue) end end |
#set_docinfo_fields(hit, event) ⇒ Object
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 356 def set_docinfo_fields(hit, event) # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} unless docinfo_target.is_a?(Hash) @logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.) # TODO: (colin) I am not sure raising is a good strategy here? raise Exception.new("Elasticsearch input: incompatible event") end @docinfo_fields.each do |field| docinfo_target[field] = hit[field] end event.set(@docinfo_target, docinfo_target) end |