Module: Elasticsearch::Transport::Transport::Base Abstract
- Included in:
- HTTP::Curb, HTTP::Faraday, HTTP::Manticore
- Defined in:
- lib/elasticsearch/transport/transport/base.rb
Overview
Module with common functionality for transport implementations.
Constant Summary collapse
- DEFAULT_PORT =
9200
- DEFAULT_PROTOCOL =
'http'
- DEFAULT_RELOAD_AFTER =
Requests
10_000
- DEFAULT_RESURRECT_AFTER =
Seconds
60
- DEFAULT_MAX_RETRIES =
Requests
3
- DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
-
#counter ⇒ Object
readonly
Returns the value of attribute counter.
-
#hosts ⇒ Object
readonly
Returns the value of attribute hosts.
-
#last_request_at ⇒ Object
readonly
Returns the value of attribute last_request_at.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#max_retries ⇒ Object
Returns the value of attribute max_retries.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#protocol ⇒ Object
readonly
Returns the value of attribute protocol.
-
#reload_after ⇒ Object
Returns the value of attribute reload_after.
-
#reload_connections ⇒ Object
Returns the value of attribute reload_connections.
-
#resurrect_after ⇒ Object
Returns the value of attribute resurrect_after.
-
#serializer ⇒ Object
Returns the value of attribute serializer.
-
#sniffer ⇒ Object
Returns the value of attribute sniffer.
-
#tracer ⇒ Object
Returns the value of attribute tracer.
Instance Method Summary collapse
- #__build_connections ⇒ Connections::Collection abstract private
-
#__convert_to_json(o = nil, options = {}) ⇒ Object
private
Converts any non-String object to JSON.
-
#__full_url(host) ⇒ Object
private
Returns a full URL based on information from host.
-
#__log(method, path, params, body, url, response, json, took, duration) ⇒ Object
private
Log request and response information.
-
#__log_failed(response) ⇒ Object
private
Log failed request.
-
#__raise_transport_error(response) ⇒ Object
private
Raise error specific for the HTTP response status or a generic server error.
-
#__rebuild_connections(arguments = {}) ⇒ Object
private
Replaces the connections collection.
-
#__trace(method, path, params, body, url, response, json, took, duration) ⇒ Object
private
Trace the request in the ‘curl` format.
-
#get_connection(options = {}) ⇒ Connections::Connection
Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.
- #host_unreachable_exceptions ⇒ Array abstract
-
#initialize(arguments = {}, &block) ⇒ Object
Creates a new transport object.
-
#perform_request(method, path, params = {}, body = nil, &block) ⇒ Response
abstract
Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.
-
#reload_connections! ⇒ Object
Reloads and replaces the connection collection based on cluster information.
-
#resurrect_dead_connections! ⇒ Object
Tries to “resurrect” all eligible dead connections.
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def connections @connections end |
#counter ⇒ Object (readonly)
Returns the value of attribute counter.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def counter @counter end |
#hosts ⇒ Object (readonly)
Returns the value of attribute hosts.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def hosts @hosts end |
#last_request_at ⇒ Object (readonly)
Returns the value of attribute last_request_at.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def last_request_at @last_request_at end |
#logger ⇒ Object
Returns the value of attribute logger.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def logger @logger end |
#max_retries ⇒ Object
Returns the value of attribute max_retries.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def max_retries @max_retries end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def @options end |
#protocol ⇒ Object (readonly)
Returns the value of attribute protocol.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def protocol @protocol end |
#reload_after ⇒ Object
Returns the value of attribute reload_after.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def reload_after @reload_after end |
#reload_connections ⇒ Object
Returns the value of attribute reload_connections.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def reload_connections @reload_connections end |
#resurrect_after ⇒ Object
Returns the value of attribute resurrect_after.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def resurrect_after @resurrect_after end |
#serializer ⇒ Object
Returns the value of attribute serializer.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def serializer @serializer end |
#sniffer ⇒ Object
Returns the value of attribute sniffer.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def sniffer @sniffer end |
#tracer ⇒ Object
Returns the value of attribute tracer.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def tracer @tracer end |
Instance Method Details
#__build_connections ⇒ Connections::Collection
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A transport implementation must implement this method. See HTTP::Faraday#__build_connections for an example.
256 257 258 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 256 def __build_connections raise NoMethodError, "Implement this method in your class" end |
#__convert_to_json(o = nil, options = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Converts any non-String object to JSON
142 143 144 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 142 def __convert_to_json(o=nil, ={}) o = o.is_a?(String) ? o : serializer.dump(o, ) end |
#__full_url(host) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a full URL based on information from host
151 152 153 154 155 156 157 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 151 def __full_url(host) url = "#{host[:protocol]}://" url += "#{host[:user]}:#{host[:password]}@" if host[:user] url += "#{host[:host]}:#{host[:port]}" url += "#{host[:path]}" if host[:path] url end |
#__log(method, path, params, body, url, response, json, took, duration) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Log request and response information.
105 106 107 108 109 110 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 105 def __log(method, path, params, body, url, response, json, took, duration) logger.info "#{method.to_s.upcase} #{url} " + "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" logger.debug "> #{__convert_to_json(body)}" if body logger.debug "< #{response.body}" end |
#__log_failed(response) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Log failed request.
115 116 117 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 115 def __log_failed(response) logger.fatal "[#{response.status}] #{response.body}" end |
#__raise_transport_error(response) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Raise error specific for the HTTP response status or a generic server error
134 135 136 137 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 134 def __raise_transport_error(response) error = ERRORS[response.status] || ServerError raise error.new "[#{response.status}] #{response.body}" end |
#__rebuild_connections(arguments = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Replaces the connections collection.
95 96 97 98 99 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 95 def __rebuild_connections(arguments={}) @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} @connections = __build_connections end |
#__trace(method, path, params, body, url, response, json, took, duration) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Trace the request in the ‘curl` format.
122 123 124 125 126 127 128 129 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 122 def __trace(method, path, params, body, url, response, json, took, duration) trace_url = "http://localhost:9200/#{path}?pretty" + ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' tracer.info "curl -X #{method.to_s.upcase} '#{trace_url}'#{trace_body}\n" tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" end |
#get_connection(options = {}) ⇒ Connections::Connection
Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.
Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.
60 61 62 63 64 65 66 67 68 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 60 def get_connection(={}) resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after connection = connections.get_connection() @counter_mtx.synchronize { @counter += 1 } reload_connections! if reload_connections && counter % reload_after == 0 connection end |
#host_unreachable_exceptions ⇒ Array
Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.
247 248 249 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 247 def host_unreachable_exceptions [Errno::ECONNREFUSED] end |
#initialize(arguments = {}, &block) ⇒ Object
Creates a new transport object.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 30 def initialize(arguments={}, &block) @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} @block = block @connections = __build_connections @serializer = [:serializer] || ( [:serializer_class] ? [:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) ) @protocol = [:protocol] || DEFAULT_PROTOCOL @logger = [:logger] @tracer = [:tracer] @sniffer = [:sniffer_class] ? [:sniffer_class].new(self) : Sniffer.new(self) @counter = 0 @counter_mtx = Mutex.new @last_request_at = Time.now @reload_connections = [:reload_connections] @reload_after = [:reload_connections].is_a?(Fixnum) ? [:reload_connections] : DEFAULT_RELOAD_AFTER @resurrect_after = [:resurrect_after] || DEFAULT_RESURRECT_AFTER @max_retries = [:retry_on_failure].is_a?(Fixnum) ? [:retry_on_failure] : DEFAULT_MAX_RETRIES end |
#perform_request(method, path, params = {}, body = nil, &block) ⇒ Response
The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.
Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 176 def perform_request(method, path, params={}, body=nil, &block) raise NoMethodError, "Implement this method in your transport class" unless block_given? start = Time.now if logger || tracer tries = 0 begin tries += 1 connection = get_connection or raise Error.new("Cannot get new connection from pool.") if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash) params = connection.connection.params.merge(params.to_hash) end url = connection.full_url(path, params) response = block.call(connection, url) connection.healthy! if connection.failures > 0 rescue *host_unreachable_exceptions => e logger.error "[#{e.class}] #{e.} #{connection.host.inspect}" if logger connection.dead! if @options[:reload_on_failure] and tries < connections.all.size logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger reload_connections! and retry end if @options[:retry_on_failure] logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger if tries <= max_retries retry else logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger raise e end else raise e end rescue Exception => e logger.fatal "[#{e.class}] #{e.} (#{connection.host.inspect if connection})" if logger raise e end duration = Time.now-start if logger || tracer if response.status.to_i >= 300 __log method, path, params, body, url, response, nil, 'N/A', duration if logger __trace method, path, params, body, url, response, nil, 'N/A', duration if tracer __log_failed response if logger __raise_transport_error response end json = serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/ took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer __log method, path, params, body, url, response, json, took, duration if logger __trace method, path, params, body, url, response, json, took, duration if tracer Response.new response.status, json || response.body, response.headers ensure @last_request_at = Time.now end |
#reload_connections! ⇒ Object
Reloads and replaces the connection collection based on cluster information.
74 75 76 77 78 79 80 81 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 74 def reload_connections! hosts = sniffer.hosts __rebuild_connections :hosts => hosts, :options => self rescue SnifferTimeoutError logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger self end |
#resurrect_dead_connections! ⇒ Object
Tries to “resurrect” all eligible dead connections.
87 88 89 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 87 def resurrect_dead_connections! connections.dead.each { |c| c.resurrect! } end |