Class: Logtail::LogDevices::HTTP

Inherits:
Object
  • Object
show all
Defined in:
lib/logtail/log_devices/http.rb,
lib/logtail/log_devices/http/request_attempt.rb,
lib/logtail/log_devices/http/flushable_dropping_sized_queue.rb

Overview

A highly efficient log device that buffers and delivers log messages over HTTPS to the Logtail API. It uses batches, keep-alive connections, and msgpack to deliver logs with high-throughput and little overhead. All log preparation and delivery is done asynchronously in a thread as not to block application execution and efficiently deliver logs for multi-threaded environments.

See #initialize for options and more details.

Defined Under Namespace

Classes: FlushableDroppingSizedQueue, RequestAttempt

Constant Summary collapse

LOGTAIL_STAGING_HOST =
"in.logtail.dev".freeze
LOGTAIL_PRODUCTION_HOST =
"in.logtail.com".freeze
LOGTAIL_HOST =
ENV['LOGTAIL_STAGING'] ? LOGTAIL_STAGING_HOST : LOGTAIL_PRODUCTION_HOST
LOGTAIL_PORT =
443
LOGTAIL_SCHEME =
"https".freeze
CONTENT_TYPE =
"application/msgpack".freeze
USER_AGENT =
"Logtail Ruby/#{Logtail::VERSION} (HTTP)".freeze

Instance Method Summary collapse

Constructor Details

#initialize(source_token, options = {}) ⇒ HTTP

Instantiates a new HTTP log device that can be passed to Logtail::Logger#initialize.

The class maintains a buffer which is flushed in batches to the Logtail API. 2 options control when the flush happens, ‘:batch_byte_size` and `:flush_interval`. If either of these are surpassed, the buffer will be flushed.

By default, the buffer will apply back pressure when the rate of log messages exceeds the maximum delivery rate. If you don’t want to sacrifice app performance in this case you can drop the log messages instead by passing a DroppingSizedQueue via the ‘:request_queue` option.

Examples:

Basic usage

Logtail::Logger.new(Logtail::LogDevices::HTTP.new("my_logtail_source_token"))

Apply back pressure instead of dropping messages

http_log_device = Logtail::LogDevices::HTTP.new("my_logtail_source_token", request_queue: SizedQueue.new(25))
Logtail::Logger.new(http_log_device)

Parameters:

  • source_token (String)

    The API key provided to you after you add your application to [Logtail](logtail.com).

  • options (Hash) (defaults to: {})

    the options to create a HTTP log device with.

  • attributes (Hash)

    a customizable set of options



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/logtail/log_devices/http.rb', line 72

def initialize(source_token, options = {})
  @source_token = source_token || raise(ArgumentError.new("The source_token parameter cannot be blank"))
  @logtail_host = options[:logtail_host] || ENV['LOGTAIL_HOST'] || LOGTAIL_HOST
  @logtail_port = options[:logtail_port] || ENV['LOGTAIL_PORT'] || LOGTAIL_PORT
  @logtail_scheme = options[:logtail_scheme] || ENV['LOGTAIL_SCHEME'] || LOGTAIL_SCHEME
  @batch_size = options[:batch_size] || 1_000
  @flush_continuously = options[:flush_continuously] != false
  @flush_interval = options[:flush_interval] || 2 # 2 seconds
  @requests_per_conn = options[:requests_per_conn] || 2_500
  @msg_queue = FlushableDroppingSizedQueue.new(@batch_size)
  @request_queue = options[:request_queue] || FlushableDroppingSizedQueue.new(25)
  @successive_error_count = 0
  @requests_in_flight = 0
end

Instance Method Details

#closeObject

Closes the log device, cleans up, and attempts one last delivery.



119
120
121
122
123
124
125
126
127
128
# File 'lib/logtail/log_devices/http.rb', line 119

def close
  # Kill the flush thread immediately since we are about to flush again.
  @flush_thread.kill if @flush_thread

  # Flush all remaining messages
  flush

  # Kill the request queue thread. Flushing ensures that no requests are pending.
  @request_outlet_thread.kill if @request_outlet_thread
end

#deliver_one(msg) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/logtail/log_devices/http.rb', line 130

def deliver_one(msg)
  http = build_http

  begin
    resp = http.start do |conn|
      req = build_request([msg])
      @requests_in_flight += 1
      conn.request(req)
    end
    return resp
  rescue => e
    Logtail::Config.instance.debug { "error: #{e.message}" }
    return e
  ensure
    http.finish if http.started?
    @requests_in_flight -= 1
  end
end

#flushObject

Flush all log messages in the buffer synchronously. This method will not return until delivery of the messages has been successful. If you want to flush asynchronously see #flush_async.



112
113
114
115
116
# File 'lib/logtail/log_devices/http.rb', line 112

def flush
  flush_async
  wait_on_request_queue
  true
end

#verify_delivery!Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/logtail/log_devices/http.rb', line 149

def verify_delivery!
  5.times do |i|
    sleep(2)

    if @last_resp.nil?
      print "."
    elsif @last_resp.code == "202"
      puts "Log delivery successful! View your logs at https://logtail.com"
    else
      raise <<-MESSAGE

Log delivery failed!

Status: #{@last_resp.code}
Body: #{@last_resp.body}

You can enable internal Logtail debug logging with the following:

Logtail::Config.instance.debug_logger = ::Logger.new(STDOUT)
MESSAGE
    end
  end

  raise <<-MESSAGE

Log delivery failed! No request was made.

You can enable internal debug logging with the following:

Logtail::Config.instance.debug_logger = ::Logger.new(STDOUT)
MESSAGE
end

#write(msg) ⇒ Object

Write a new log line message to the buffer, and flush asynchronously if the message queue is full. We flush asynchronously because the maximum message batch size is constricted by the Logtail API. The actual application limit is a multiple of this. Hence the ‘@request_queue`.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/logtail/log_devices/http.rb', line 91

def write(msg)
  return unless Logtail.config.send_to_better_stack?(msg)

  @msg_queue.enq(msg)

  # Lazily start flush threads to ensure threads are alive after forking processes.
  # If the threads are started during instantiation they will not be copied when
  # the current process is forked. This is the case with various web servers,
  # such as phusion passenger.
  ensure_flush_threads_are_started

  if @msg_queue.full?
    Logtail::Config.instance.debug { "Flushing HTTP buffer via write" }
    flush_async
  end
  true
end