Class: LogStash::Outputs::OpenSearch::HttpClient::ManticoreAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb

Constant Summary collapse

ISO_8859_1 =
"ISO-8859-1".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, options = {}) ⇒ ManticoreAdapter

Returns a new instance of ManticoreAdapter.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 42

def initialize(logger, options={})
  @logger = logger
  options = options.clone || {}
  options[:ssl] = options[:ssl] || {}

  # We manage our own retries directly, so let's disable them here
  options[:automatic_retries] = 0
  # We definitely don't need cookies
  options[:cookies] = false

  @client_params = {:headers => DEFAULT_HEADERS.merge(options[:headers] || {})}
  @type = get_auth_type(options) || nil

  if @type == AWS_IAM_AUTH_TYPE
    aws_iam_auth_initialization(options)
  elsif @type == BASIC_AUTH_TYPE
    basic_auth_initialization(options)
  end

  if options[:proxy]
    options[:proxy] = manticore_proxy_hash(options[:proxy])
  end

  @manticore = ::Manticore::Client.new(options)
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



40
41
42
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 40

def logger
  @logger
end

#manticoreObject (readonly)

Returns the value of attribute manticore.



40
41
42
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 40

def manticore
  @manticore
end

Instance Method Details

#add_basic_auth_to_params(params) ⇒ Object



217
218
219
220
221
222
223
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 217

def add_basic_auth_to_params(params)
  params[:auth] = {
    :user => get_user(),
    :password => get_password(),
    :eager => true
  }
end

#aws_iam_auth_initialization(options) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 74

def aws_iam_auth_initialization(options)
  aws_access_key_id =  options[:auth_type]["aws_access_key_id"] || nil
  aws_secret_access_key = options[:auth_type]["aws_secret_access_key"] || nil
  session_token = options[:auth_type]["session_token"] || nil
  profile = options[:auth_type]["profile"] || AWS_DEFAULT_PROFILE
  instance_cred_retries = options[:auth_type]["instance_profile_credentials_retries"] || AWS_DEFAULT_PROFILE_CREDENTIAL_RETRY
  instance_cred_timeout = options[:auth_type]["instance_profile_credentials_timeout"] || AWS_DEFAULT_PROFILE_CREDENTIAL_TIMEOUT
  region = options[:auth_type]["region"] || AWS_DEFAULT_REGION
   = nil
  set_aws_region(region)
  set_service_name(options[:auth_type]["service_name"] || AWS_SERVICE)

  credential_config = AWSIAMCredential.new(aws_access_key_id, aws_secret_access_key, session_token, profile, instance_cred_retries, instance_cred_timeout, region, )
  @credentials = Aws::CredentialProviderChain.new(credential_config).resolve
end

#basic_auth_initialization(options) ⇒ Object



90
91
92
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 90

def basic_auth_initialization(options)
  set_user_password(options)
end

#clientObject



134
135
136
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 134

def client
  @manticore
end

#closeObject



262
263
264
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 262

def close
  @manticore.close
end

#format_url(url, path_and_query = nil) ⇒ Object

Returned urls from this method should be checked for double escaping.



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 226

def format_url(url, path_and_query=nil)
  request_uri = url.clone

  # We excise auth info from the URL in case manticore itself tries to stick
  # sensitive data in a thrown exception or log data
  request_uri.user = nil
  request_uri.password = nil

  return request_uri.to_s if path_and_query.nil?

  parsed_path_and_query = java.net.URI.new(path_and_query)

  query = request_uri.query
  parsed_query = parsed_path_and_query.query

  new_query_parts = [request_uri.query, parsed_path_and_query.query].select do |part|
    part && !part.empty? # Skip empty nil and ""
  end

  request_uri.query = new_query_parts.join("&") unless new_query_parts.empty?

  # use `raw_path`` as `path` will unescape any escaped '/' in the path
  request_uri.path = "#{request_uri.path}/#{parsed_path_and_query.raw_path}".gsub(/\/{2,}/, "/")
  request_uri
end

#get_auth_type(options) ⇒ Object



68
69
70
71
72
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 68

def get_auth_type(options)
  if options[:auth_type] != nil
    options[:auth_type]["type"]
  end
end

#get_aws_regionObject



98
99
100
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 98

def get_aws_region()
  @region
end

#get_passwordObject



119
120
121
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 119

def get_password()
  @password
end

#get_service_nameObject



106
107
108
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 106

def get_service_name()
  @service_name
end

#get_userObject



115
116
117
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 115

def get_user()
  @user
end

#host_unreachable_exceptionsObject



266
267
268
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 266

def host_unreachable_exceptions
  [::Manticore::Timeout,::Manticore::SocketException, ::Manticore::ClientProtocolException, ::Manticore::ResolutionFailure, Manticore::SocketTimeout]
end

#manticore_proxy_hash(proxy_uri) ⇒ Object

Transform the proxy option to a hash. Manticore’s support for non-hash proxy options is broken. This was fixed in github.com/cheald/manticore/commit/34a00cee57a56148629ed0a47c329181e7319af5 but this is not yet released



126
127
128
129
130
131
132
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 126

def manticore_proxy_hash(proxy_uri)
  [:scheme, :port, :user, :password, :path].reduce(:host => proxy_uri.host) do |acc,opt|
    value = proxy_uri.send(opt)
    acc[opt] = value unless value.nil? || (value.is_a?(String) && value.empty?)
    acc
  end
end

#minimum_encoding_for(string) ⇒ Object



192
193
194
195
196
197
198
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 192

def minimum_encoding_for(string)
  if string.ascii_only?
    ISO_8859_1
  else
    string.encoding.to_s
  end
end

#perform_request(url, method, path, params = {}, body = nil) ⇒ Response

Performs the request by invoking Transport::Base#perform_request with a block.

Returns:

  • (Response)

See Also:

  • Transport::Base#perform_request


143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 143

def perform_request(url, method, path, params={}, body=nil)
  # Perform 2-level deep merge on the params, so if the passed params and client params will both have hashes stored on a key they
  # will be merged as well, instead of choosing just one of the values
  params = (params || {}).merge(@client_params) { |key, oldval, newval|
    (oldval.is_a?(Hash) && newval.is_a?(Hash)) ? oldval.merge(newval) : newval
  }

  params[:headers] = params[:headers].clone
  params[:body] = body if body

  if url.user
    params[:auth] = {
      :user => CGI.unescape(url.user),
      # We have to unescape the password here since manticore won't do it
      # for us unless its part of the URL
      :password => CGI.unescape(url.password),
      :eager => true
    }
  elsif @type == BASIC_AUTH_TYPE
    add_basic_auth_to_params(params)
  end

  request_uri = format_url(url, path)

  if @type == AWS_IAM_AUTH_TYPE
    sign_aws_request(request_uri, path, method, params)
  end

  request_uri_as_string = remove_double_escaping(request_uri.to_s)
  resp = @manticore.send(method.downcase, request_uri_as_string, params)

  # Manticore returns lazy responses by default
  # We want to block for our usage, this will wait for the repsonse
  # to finish
  resp.call

  # 404s are excluded because they are valid codes in the case of
  # template installation. We might need a better story around this later
  # but for our current purposes this is correct
  if resp.code < 200 || resp.code > 299 && resp.code != 404
    raise ::LogStash::Outputs::OpenSearch::HttpClient::Pool::BadResponseCodeError.new(resp.code, request_uri, body, resp.body)
  end

  resp
end

#remove_double_escaping(url) ⇒ Object

Later versions of SafeURI will also escape the ‘%’ sign in an already escaped URI. (If the path variable is used, it constructs a new java.net.URI object using the multi-arg constructor, which will escape any ‘%’ characters in the path, as opposed to the single-arg constructor which requires illegal characters to be already escaped, and will throw otherwise) The URI needs to have been previously escaped, as it does not play nice with an escaped ‘/’ in the middle of a URI, as required by date math, treating it as a path separator



258
259
260
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 258

def remove_double_escaping(url)
  url.gsub(/%25([0-9A-F]{2})/i, '%\1')
end

#set_aws_region(region) ⇒ Object



94
95
96
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 94

def set_aws_region(region)
  @region = region
end

#set_service_name(service_name) ⇒ Object



102
103
104
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 102

def set_service_name(service_name)
  @service_name = service_name
end

#set_user_password(options) ⇒ Object



110
111
112
113
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 110

def set_user_password(options)
  @user = options[:auth_type]["user"]
  @password = options[:auth_type]["password"]
end

#sign_aws_request(request_uri, path, method, params) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb', line 200

def sign_aws_request(request_uri, path, method, params)
  url = URI::HTTPS.build({:host=>URI(request_uri.to_s).host, :port=>AWS_DEFAULT_PORT.to_s, :path=>path})

  request = Seahorse::Client::Http::Request.new(options={:endpoint=>url, :http_method => method.to_s.upcase,
    :headers => params[:headers],:body => params[:body]})

  aws_signer = Aws::Sigv4::Signer.new(service: @service_name, region: @region, credentials_provider: @credentials)
  signed_key = aws_signer.sign_request(
    http_method: request.http_method,
    url: url,
    headers: params[:headers],
    # match encoding of the HTTP adapter, see https://github.com/opensearch-project/logstash-output-opensearch/issues/207
    body: params[:body] ? EntityUtils.toString(StringEntity.new(params[:body], minimum_encoding_for(params[:body]))) : nil
  )
  params[:headers] = params[:headers].merge(signed_key.headers)
end