Class: Fluent::HoneycombOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_honeycomb.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



27
28
29
30
31
32
33
34
35
# File 'lib/fluent/plugin/out_honeycomb.rb', line 27

def configure(conf)
  # Apply sane defaults. These override the poor fluentd defaults, but not
  # anything explicitly specified in the configuration.
  conf["buffer_chunk_limit"] ||= "500k"
  conf["flush_interval"] ||= "1s"
  conf["max_retry_wait"] ||= "30s"
  conf["retry_limit"] ||= 17
  super
end

#flatten(record, prefix) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
# File 'lib/fluent/plugin/out_honeycomb.rb', line 174

def flatten(record, prefix)
  ret = {}
  if record.is_a? Hash
    record.each { |key, value|
      ret.merge! flatten(value, "#{prefix}.#{key.to_s}")
    }
  else
    return {prefix => record}
  end
  ret
end

#format(tag, time, record) ⇒ Object



49
50
51
# File 'lib/fluent/plugin/out_honeycomb.rb', line 49

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#parse_response(batch, resp) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluent/plugin/out_honeycomb.rb', line 133

def parse_response(batch, resp)
  if resp.status != 200
    log.error "Error sending batch: #{resp.status}, #{resp.body}"
    # Force retry by returning the entire batch
    return batch
  end

  begin
    results = JSON.parse(resp.body.to_s)
  rescue JSON::ParserError => e
    log.warn "Error parsing response as JSON: #{e}"
    raise e
  end
  successes = 0
  failures = []
  if !results.is_a? Array
    log.warning "Unexpected response format: #{results}"
    raise Exception.new("Unexpected response format: #{resp.status}")
  end

  results.each_with_index do |result, idx|
    if !result.is_a? Hash
      log.warning "Unexpected status format in response: #{result}"
      next
    end

    if result["status"] == 202
      successes += 1
    else
      failures.push(batch[idx])
    end
  end

  if failures.size > 0
    log.warn "Errors publishing records: #{failures.size} failures out of #{successes + failures.size}"
  else
    log.debug "Successfully published #{successes} records"
  end
  return failures
end

#publish_batch(dataset, batch, retry_count) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/fluent/plugin/out_honeycomb.rb', line 112

def publish_batch(dataset, batch, retry_count)
  if batch.length == 0
    return
  end
  log.info "publishing #{batch.length} records to dataset #{dataset}"
  body = JSON.dump(batch)
  resp = HTTP.headers(
      "User-Agent" => "fluent-plugin-honeycomb/#{HONEYCOMB_PLUGIN_VERSION}",
      "Content-Type" => "application/json",
      "X-Honeycomb-Team" => @writekey)
      .post(URI.join(@api_host, "/1/batch/#{dataset}"), {
          :body => body,
      })
  failures = parse_response(batch, resp)
  if failures.size > 0 && retry_count < @retry_limit
    # sleep and retry with the set of failed events
    sleep 1
    publish_batch(dataset, failures, retry_count + 1)
  end
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



45
46
47
# File 'lib/fluent/plugin/out_honeycomb.rb', line 45

def shutdown
  super
end

#startObject

This method is called when starting. Open sockets or files here.



39
40
41
# File 'lib/fluent/plugin/out_honeycomb.rb', line 39

def start
  super
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/out_honeycomb.rb', line 60

def write(chunk)
  batches =  Hash.new{ |h, k| h[k] = [] }
  chunk.msgpack_each do |(tag, time, record)|
    if !record.is_a? Hash
      log.debug "Skipping record #{record}"
      next
    end

    if @presampled_key && record.include?(@presampled_key)
      sample_rate = record.delete(@presampled_key)

      if !sample_rate.is_a?(Integer) || sample_rate < 1
        log.warn "Record emitted a presampled key (#{@presampled_key} = #{sample_rate}), but was not a valid sample rate #{record}"

        sample_rate = 1
      end
    else
      sample_rate = @sample_rate

      if @sample_rate > 1 && rand(1..@sample_rate) > 1
        next
      end
    end

    if @include_tag_key
      record[@tag_key] = tag
    end
    @flatten_keys.each do |k|
      next unless record[k].is_a?(Hash)
      record.merge!(flatten(record[k], k))
      record.delete(k)
    end

    if (@dataset_from_key != "" && record.has_key?(@dataset_from_key))
      dataset = record[@dataset_from_key]
      record.delete @dataset_from_key
    else
      dataset = @dataset
    end
    batch = batches[dataset]
    batch.push({
        "data" => record,
        "samplerate" => sample_rate,
        "time" => Time.at(time).utc.to_datetime.rfc3339
    })
  end

  batches.each do |dataset, batch|
    publish_batch(dataset, batch, 0)
  end
end