Class: LogStash::Inputs::OpenSearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::OpenSearch
- Extended by:
- PositiveWholeNumberValidator, URIOrEmptyValidator, PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
- Includes:
- PluginMixins::ECSCompatibilitySupport::TargetCheck, PluginMixins::EventSupport::EventFactoryAdapter
- Defined in:
- lib/logstash/inputs/opensearch.rb
Overview
.Compatibility Note
- NOTE
-
Starting with OpenSearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the OpenSearch input plugin to version 4.0.2 or higher.
Read from an OpenSearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).
Example:
- source,ruby
-
input {
# Read all documents from OpenSearch matching the given query opensearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }' }
}
This would create an OpenSearch query with the following format:
- source,json
-
curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{
"query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ]
}‘
Scheduling
Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
Examples:
|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================
Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
Defined Under Namespace
Modules: PositiveWholeNumberValidator, URIOrEmptyValidator
Instance Method Summary collapse
-
#initialize(params = {}) ⇒ OpenSearch
constructor
A new instance of OpenSearch.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #stop ⇒ Object
Methods included from URIOrEmptyValidator
Methods included from PositiveWholeNumberValidator
Constructor Details
#initialize(params = {}) ⇒ OpenSearch
Returns a new instance of OpenSearch.
182 183 184 185 186 187 188 |
# File 'lib/logstash/inputs/opensearch.rb', line 182 def initialize(params={}) super(params) if docinfo_target.nil? @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][opensearch]'] end end |
Instance Method Details
#register ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/logstash/inputs/opensearch.rb', line 190 def register require "rufus/scheduler" @options = { :index => @index, :scroll => @scroll, :size => @size } @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "OpenSearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "OpenSearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end = {:headers => {}} [:headers].merge!(setup_basic_auth(user, password)) [:headers].merge!({'user-agent' => prepare_user_agent()}) [:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil? [:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil? [:socket_timeout] = @socket_timeout_seconds unless @socket_timeout_seconds.nil? hosts = setup_hosts = setup_ssl @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('') [:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('') @client = OpenSearch::Client.new( :hosts => hosts, :transport_options => , :transport_class => ::OpenSearch::Transport::Transport::HTTP::Manticore, :ssl => ) test_connection! @client end |
#run(output_queue) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/logstash/inputs/opensearch.rb', line 229 def run(output_queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do do_run(output_queue) end @scheduler.join else do_run(output_queue) end end |
#stop ⇒ Object
242 243 244 |
# File 'lib/logstash/inputs/opensearch.rb', line 242 def stop @scheduler.stop if @scheduler end |