Class: Hyperdx::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperdx/ruby/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(request, uri, opts) ⇒ Client

Returns a new instance of Client.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/hyperdx/ruby/client.rb', line 15

def initialize(request, uri, opts)
  @uri = uri

  # NOTE: buffer is in memory
  @buffer = []

  @lock = Mutex.new

  @flush_interval = opts[:flush_interval] || Resources::FLUSH_INTERVAL
  @flush_size = opts[:flush_size] || Resources::FLUSH_SIZE

  @request = request
  @request_size = opts[:request_size] || Resources::REQUEST_SIZE

  @retry_timeout = opts[:retry_timeout] || Resources::RETRY_TIMEOUT
  @retry_max_jitter = opts[:retry_max_jitter] || Resources::RETRY_MAX_JITTER
  @retry_max_attempts = opts[:retry_max_attempts] || Resources::RETRY_MAX_ATTEMPTS

  @internal_logger = Logger.new($stdout)
  @internal_logger.level = Logger::DEBUG

  @work_thread_pool = Concurrent::FixedThreadPool.new(Etc.nprocessors)
  # TODO: Expose an option to configure the maximum concurrent requests
  # Requires the instance-global request to be resolved first
  @request_thread_pool = Concurrent::FixedThreadPool.new(Resources::MAX_CONCURRENT_REQUESTS)

  @scheduled_flush = nil
end

Instance Method Details

#exitoutObject



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/hyperdx/ruby/client.rb', line 190

def exitout
  unschedule_flush
  @work_thread_pool.shutdown
  if !@work_thread_pool.wait_for_termination(1)
    @internal_logger.warn("Work thread pool unable to shutdown gracefully. Logs potentially dropped")
  end
  @request_thread_pool.shutdown
  if !@request_thread_pool.wait_for_termination(5)
    @internal_logger.warn("Request thread pool unable to shutdown gracefully. Logs potentially dropped")
  end

  if @buffer.any?
    @internal_logger.debug("Exiting HyperDX logger: Logging remaining messages")
    flush_sync({ block_on_requests: true })
    @internal_logger.debug("Finished flushing logs to HyperDX")
  end
end

#flush(options = {}) ⇒ Object

Flushes all logs to HyperDX asynchronously



98
99
100
# File 'lib/hyperdx/ruby/client.rb', line 98

def flush(options = {})
  Concurrent::Future.execute({ executor: @work_thread_pool }) { flush_sync(options) }
end

#flush_sync(options = {}) ⇒ Object

Flushes all logs to HyperDX synchronously



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/hyperdx/ruby/client.rb', line 104

def flush_sync(options = {})
  slices = @lock.synchronize do
    # Slice the buffer into chunks that try to be no larger than @request_size. Slice points are found with
    # a binary search thanks to the structure of @buffer. We are working backwards because it's cheaper to
    # remove from the tail of an array instead of the head
    slices = []
    until @buffer.empty?
      search_size = @buffer[-1].running_size - @request_size
      if search_size.negative?
        search_size = 0
      end

      slice_index = @buffer.bsearch_index { |message| message.running_size >= search_size }
      slices.push(@buffer.pop(@buffer.length - slice_index).map(&:source))
    end
    slices
  end

  # Remember the chunks are in reverse order, this un-reverses them
  slices.reverse_each do |slice|
    if options[:block_on_requests]
      try_request(slice)
    else
      Concurrent::Future.execute({ executor: @request_thread_pool }) { try_request(slice) }
    end
  end
end

#process_message(msg, opts = {}) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/hyperdx/ruby/client.rb', line 57

def process_message(msg, opts = {})
  processed_message = {
    message: msg,
    app: opts[:app],
    level: opts[:level],
    env: opts[:env],
    meta: opts[:meta],
    timestamp: Time.now.to_i,
  }
  processed_message.delete(:meta) if processed_message[:meta].nil?
  processed_message
end

#schedule_flushObject



44
45
46
47
48
# File 'lib/hyperdx/ruby/client.rb', line 44

def schedule_flush
  if @scheduled_flush.nil? || @scheduled_flush.complete?
    @scheduled_flush = Concurrent::ScheduledTask.execute(@flush_interval) { flush }
  end
end

#send_request(body, error_header) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/hyperdx/ruby/client.rb', line 154

def send_request(body, error_header)
  # TODO: Remove instance-global request object
  @request.body = body
  begin
    response = Net::HTTP.start(
      @uri.hostname,
      @uri.port,
      use_ssl: @uri.scheme == "https"
    ) do |http|
      http.request(@request)
    end

    code = response.code.to_i
    if [401, 403].include?(code)
      @internal_logger.debug("#{error_header} Please provide a valid ingestion key. Discarding flush buffer")
      return true
    elsif [408, 500, 504].include?(code)
      # These codes might indicate a temporary ingester issue
      @internal_logger.debug("#{error_header} The request failed #{response}. Retrying")
    elsif code == 200
      return true
    else
      @internal_logger.debug("#{error_header} The request failed #{response}. Discarding flush buffer")
      return true
    end
  rescue SocketError
    @internal_logger.debug("#{error_header} Network connectivity issue. Retrying")
  rescue Errno::ECONNREFUSED => e
    @internal_logger.debug("#{error_header} The server is down. #{e.message}. Retrying")
  rescue Timeout::Error => e
    @internal_logger.debug("#{error_header} Timeout error occurred. #{e.message}. Retrying")
  end

  false
end

#try_request(slice) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/hyperdx/ruby/client.rb', line 132

def try_request(slice)
  body = slice.to_json

  flush_id = "#{SecureRandom.uuid} [#{slice.length} lines]"
  error_header = "Flush {#{flush_id}} failed."
  tries = 0
  loop do
    tries += 1

    if tries > @retry_max_attempts
      @internal_logger.debug("Flush {#{flush_id}} exceeded 3 tries. Discarding flush buffer")
      break
    end

    if send_request(body, error_header)
      break
    end

    sleep(@retry_timeout * (1 << (tries - 1)) + rand(@retry_max_jitter))
  end
end

#unschedule_flushObject



50
51
52
53
54
55
# File 'lib/hyperdx/ruby/client.rb', line 50

def unschedule_flush
  if !@scheduled_flush.nil?
    @scheduled_flush.cancel
    @scheduled_flush = nil
  end
end

#write_to_buffer(msg, opts) ⇒ Object



70
71
72
# File 'lib/hyperdx/ruby/client.rb', line 70

def write_to_buffer(msg, opts)
  Concurrent::Future.execute({ executor: @work_thread_pool }) { write_to_buffer_sync(msg, opts) }
end

#write_to_buffer_sync(msg, opts) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/hyperdx/ruby/client.rb', line 74

def write_to_buffer_sync(msg, opts)
  processed_message = process_message(msg, opts)
  message_size = processed_message.to_s.bytesize

  running_size = @lock.synchronize do
    running_size = message_size
    if @buffer.any?
      running_size += @buffer[-1].running_size
    end
    @buffer.push(Message.new(processed_message, running_size))

    running_size
  end

  if running_size >= @flush_size
    unschedule_flush
    flush_sync
  else
    schedule_flush
  end
end