Class: Scalyr::Common::Client::ClientSession
- Inherits:
-
Object
- Object
- Scalyr::Common::Client::ClientSession
- Defined in:
- lib/scalyr/common/client.rb
Overview
Encapsulates the connection between the agent and the Scalyr server, thus shielding the implementation (which may create a new connection for every post or use a persistent connection)
Instance Method Summary collapse
- #client ⇒ Object
-
#client_config ⇒ Object
def initialize.
- #close ⇒ Object
-
#get_new_latency_stats ⇒ Object
Convenience method to create a fresh quantile estimator.
-
#get_stats ⇒ Object
Get a clone of current statistics hash and calculate percentiles.
-
#initialize(logger, add_events_uri, compression_type, compression_level, ssl_verify_peer, ssl_ca_bundle_path, append_builtin_cert, record_stats_for_status, flush_quantile_estimates_on_status_send, connect_timeout, socket_timeout, request_timeout, pool_max, pool_max_per_route) ⇒ ClientSession
constructor
A new instance of ClientSession.
-
#post_add_events(body, is_status, body_serialization_duration = 0) ⇒ Object
Upload data to Scalyr.
-
#send_ping(body) ⇒ Object
Send “ping” request to the API.
Constructor Details
#initialize(logger, add_events_uri, compression_type, compression_level, ssl_verify_peer, ssl_ca_bundle_path, append_builtin_cert, record_stats_for_status, flush_quantile_estimates_on_status_send, connect_timeout, socket_timeout, request_timeout, pool_max, pool_max_per_route) ⇒ ClientSession
Returns a new instance of ClientSession.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/scalyr/common/client.rb', line 59 def initialize(logger, add_events_uri, compression_type, compression_level, ssl_verify_peer, ssl_ca_bundle_path, append_builtin_cert, record_stats_for_status, flush_quantile_estimates_on_status_send, connect_timeout, socket_timeout, request_timeout, pool_max, pool_max_per_route) @logger = logger @add_events_uri = add_events_uri # typically /addEvents @compression_type = compression_type @compression_level = compression_level @ssl_verify_peer = ssl_verify_peer @ssl_ca_bundle_path = ssl_ca_bundle_path @append_builtin_cert = append_builtin_cert @record_stats_for_status = record_stats_for_status @flush_quantile_estimates_on_status_send = flush_quantile_estimates_on_status_send @connect_timeout = connect_timeout @socket_timeout = socket_timeout @request_timeout = request_timeout @pool_max = pool_max @pool_max_per_route = pool_max_per_route # A cert to use by default to avoid issues caused by the OpenSSL library not validating certs according to standard @cert_string = CA_CERT_STRING # Request statistics are accumulated across multiple threads and must be accessed through a mutex @stats_lock = Mutex.new @latency_stats = get_new_latency_stats @stats = { :total_requests_sent => 0, # The total number of RPC requests sent. :total_requests_failed => 0, # The total number of RPC requests that failed. :total_request_bytes_sent => 0, # The total number of bytes sent over the network. :total_compressed_request_bytes_sent => 0, # The total number of compressed bytes sent over the network :total_response_bytes_received => 0, # The total number of bytes received. :total_request_latency_secs => 0, # The total number of secs spent waiting for a responses (so average latency # can be calculated by dividing this number by @total_requests_sent). # This includes connection establishment time. :total_serialization_duration_secs => 0, # The total duration (in seconds) it took to serialize (JSON dumos) all the request bodies. # You can calculate avg compression duration by diving this value with total_requests_sent :total_compression_duration_secs => 0, # The total duration (in seconds) it took to compress all the request bodies. # You can calculate avg compression duration by diving this value with total_requests_sent :compression_type => @compression_type, :compression_level => @compression_level, } end |
Instance Method Details
#client ⇒ Object
164 165 166 |
# File 'lib/scalyr/common/client.rb', line 164 def client @client ||= Manticore::Client.new(client_config) end |
#client_config ⇒ Object
def initialize
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/scalyr/common/client.rb', line 102 def client_config # TODO: Eventually expose some more of these as config options, though nothing here really needs tuning normally # besides SSL c = { connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, request_timeout: @request_timeout, follow_redirects: true, automatic_retries: 1, retry_non_idempotent: false, check_connection_timeout: 200, pool_max: @pool_max, pool_max_per_route: @pool_max_per_route, cookies: true, keepalive: true, ssl: {} } # verify peers to prevent potential MITM attacks if @ssl_verify_peer c[:ssl][:verify] = :strict if not @append_builtin_cert # System CA bundle is used, no need to copy it over and append our bundled CA cert @logger.info("Using CA bundle from #{@ssl_ca_bundle_path} to validate the server side certificate") @ca_cert_path = @ssl_ca_bundle_path if not File.file?(@ssl_ca_bundle_path) # TODO: For now we don't throw to keep code backward compatible. In the future in case # file doesn't exist, we should throw instead of write empty CA cert file and pass that # to Manticore which will eventually fail and throw on cert validation #raise Errno::ENOENT.new("ssl_ca_bundle_path config option to an invalid file path which doesn't exist - #{@ssl_ca_bundle_path}") @ca_cert = Tempfile.new("ca_cert") @ca_cert_path = @ca_cert.path end else @ca_cert = Tempfile.new("ca_cert") if File.file?(@ssl_ca_bundle_path) @ca_cert.write(File.read(@ssl_ca_bundle_path)) @ca_cert.flush else @logger.warn("CA bundle (#{@ssl_ca_bundle_path}) doesn't exist, using only bundled CA certificates") end open(@ca_cert.path, "a") do |f| f.puts @cert_string end @ca_cert.flush @ca_cert_path = @ca_cert.path @logger.info("Using CA bundle from #{@ssl_ca_bundle_path} combined with bundled certificates to validate the server side certificate (#{@ca_cert_path})") end c[:ssl][:ca_file] = @ca_cert_path else c[:ssl][:verify] = :disable end c end |
#close ⇒ Object
261 262 263 |
# File 'lib/scalyr/common/client.rb', line 261 def close @client.close if @client end |
#get_new_latency_stats ⇒ Object
Convenience method to create a fresh quantile estimator
169 170 171 172 173 174 175 176 177 |
# File 'lib/scalyr/common/client.rb', line 169 def get_new_latency_stats return { # The total number of HTTP connections successfully created. :serialization_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to serialize (JSON dumos) all the request bodies. :compression_duration_secs => Quantile::Estimator.new, # The duration (in seconds) it took to compress all the request bodies. :request_latency_secs => Quantile::Estimator.new, # Secs spent waiting for a responses. This includes connection establishment time. :bytes_sent => Quantile::Estimator.new # The number of bytes sent over the network. Batch size with a bit more overhead. } end |
#get_stats ⇒ Object
Get a clone of current statistics hash and calculate percentiles
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/scalyr/common/client.rb', line 180 def get_stats @stats_lock.synchronize do current_stats = @stats.clone current_stats[:request_latency_p50] = @latency_stats[:request_latency_secs].query(0.5) current_stats[:request_latency_p90] = @latency_stats[:request_latency_secs].query(0.9) current_stats[:request_latency_p99] = @latency_stats[:request_latency_secs].query(0.99) current_stats[:serialization_duration_secs_p50] = @latency_stats[:serialization_duration_secs].query(0.5) current_stats[:serialization_duration_secs_p90] = @latency_stats[:serialization_duration_secs].query(0.9) current_stats[:serialization_duration_secs_p99] = @latency_stats[:serialization_duration_secs].query(0.99) current_stats[:compression_duration_secs_p50] = @latency_stats[:compression_duration_secs].query(0.5) current_stats[:compression_duration_secs_p90] = @latency_stats[:compression_duration_secs].query(0.9) current_stats[:compression_duration_secs_p99] = @latency_stats[:compression_duration_secs].query(0.99) current_stats[:bytes_sent_p50] = @latency_stats[:bytes_sent].query(0.5) current_stats[:bytes_sent_p90] = @latency_stats[:bytes_sent].query(0.9) current_stats[:bytes_sent_p99] = @latency_stats[:bytes_sent].query(0.99) if @flush_quantile_estimates_on_status_send @logger.debug "Recreating / reseting quantile estimator classes for plugin metrics" @latency_stats = get_new_latency_stats end current_stats end end |
#post_add_events(body, is_status, body_serialization_duration = 0) ⇒ Object
Upload data to Scalyr. Assumes that the body size complies with Scalyr limits
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/scalyr/common/client.rb', line 216 def post_add_events(body, is_status, body_serialization_duration = 0) post_body, post_headers, compression_duration = prepare_post_object @add_events_uri.path, body fail_count = 1 # putative assume failure start_time = Time.now uncompressed_bytes_sent = 0 compressed_bytes_sent = 0 bytes_received = 0 begin response = client.send(:post, @add_events_uri, body: post_body, headers: post_headers) handle_response(response) fail_count -= 1 # success means we negate the putative failure uncompressed_bytes_sent = (body.bytesize + @add_events_uri.path.bytesize) compressed_bytes_sent = (post_body.bytesize + @add_events_uri.path.bytesize) bytes_received = response.body.bytesize # echee: double check # echee TODO add more statistics rescue Manticore::ManticoreException => e # The underlying persistent-connection library automatically retries when there are network-related errors. # Eventually, it will give up and raise this generic error, at which time, we convert it to a ClientError raise ClientError.new(e., @add_events_uri, e.class.name) ensure if @record_stats_for_status or !is_status @stats_lock.synchronize do @stats[:total_requests_sent] += 1 @stats[:total_requests_failed] += fail_count @stats[:total_request_bytes_sent] += uncompressed_bytes_sent @stats[:total_compressed_request_bytes_sent] += compressed_bytes_sent @stats[:total_response_bytes_received] += bytes_received @stats[:total_serialization_duration_secs] += body_serialization_duration @stats[:total_compression_duration_secs] += compression_duration end_time = Time.now @stats[:total_request_latency_secs] += (end_time - start_time) @latency_stats[:request_latency_secs].observe(end_time - start_time) @latency_stats[:serialization_duration_secs].observe(body_serialization_duration) @latency_stats[:compression_duration_secs].observe(compression_duration) @latency_stats[:bytes_sent].observe(uncompressed_bytes_sent) end end end end |
#send_ping(body) ⇒ Object
Send “ping” request to the API. This is mostly used to test the connecting with Scalyr API and verify that the API key is valid.
207 208 209 210 211 212 213 |
# File 'lib/scalyr/common/client.rb', line 207 def send_ping(body) post_body, post_headers, compression_duration = prepare_post_object @add_events_uri.path, body response = client.send(:post, @add_events_uri, body: post_body, headers: post_headers) handle_response(response) response end |