Class: Fluent::HoneycombOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::HoneycombOutput
- Defined in:
- lib/fluent/plugin/out_honeycomb.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #flatten(record, prefix) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #parse_response(batch, resp) ⇒ Object
- #publish_batch(dataset, batch, retry_count) ⇒ Object
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
27 28 29 30 31 32 33 34 35 |
# File 'lib/fluent/plugin/out_honeycomb.rb', line 27 def configure(conf) # Apply sane defaults. These override the poor fluentd defaults, but not # anything explicitly specified in the configuration. conf["buffer_chunk_limit"] ||= "500k" conf["flush_interval"] ||= "1s" conf["max_retry_wait"] ||= "30s" conf["retry_limit"] ||= 17 super end |
#flatten(record, prefix) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/fluent/plugin/out_honeycomb.rb', line 174 def flatten(record, prefix) ret = {} if record.is_a? Hash record.each { |key, value| ret.merge! flatten(value, "#{prefix}.#{key.to_s}") } else return {prefix => record} end ret end |
#format(tag, time, record) ⇒ Object
49 50 51 |
# File 'lib/fluent/plugin/out_honeycomb.rb', line 49 def format(tag, time, record) [tag, time, record].to_msgpack end |
#parse_response(batch, resp) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/fluent/plugin/out_honeycomb.rb', line 133 def parse_response(batch, resp) if resp.status != 200 log.error "Error sending batch: #{resp.status}, #{resp.body}" # Force retry by returning the entire batch return batch end begin results = JSON.parse(resp.body.to_s) rescue JSON::ParserError => e log.warn "Error parsing response as JSON: #{e}" raise e end successes = 0 failures = [] if !results.is_a? Array log.warning "Unexpected response format: #{results}" raise Exception.new("Unexpected response format: #{resp.status}") end results.each_with_index do |result, idx| if !result.is_a? Hash log.warning "Unexpected status format in response: #{result}" next end if result["status"] == 202 successes += 1 else failures.push(batch[idx]) end end if failures.size > 0 log.warn "Errors publishing records: #{failures.size} failures out of #{successes + failures.size}" else log.debug "Successfully published #{successes} records" end return failures end |
#publish_batch(dataset, batch, retry_count) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/fluent/plugin/out_honeycomb.rb', line 112 def publish_batch(dataset, batch, retry_count) if batch.length == 0 return end log.info "publishing #{batch.length} records to dataset #{dataset}" body = JSON.dump(batch) resp = HTTP.headers( "User-Agent" => "fluent-plugin-honeycomb/#{HONEYCOMB_PLUGIN_VERSION}", "Content-Type" => "application/json", "X-Honeycomb-Team" => @writekey) .post(URI.join(@api_host, "/1/batch/#{dataset}"), { :body => body, }) failures = parse_response(batch, resp) if failures.size > 0 && retry_count < @retry_limit # sleep and retry with the set of failed events sleep 1 publish_batch(dataset, failures, retry_count + 1) end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
45 46 47 |
# File 'lib/fluent/plugin/out_honeycomb.rb', line 45 def shutdown super end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
39 40 41 |
# File 'lib/fluent/plugin/out_honeycomb.rb', line 39 def start super end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/out_honeycomb.rb', line 60 def write(chunk) batches = Hash.new{ |h, k| h[k] = [] } chunk.msgpack_each do |(tag, time, record)| if !record.is_a? Hash log.debug "Skipping record #{record}" next end if @presampled_key && record.include?(@presampled_key) sample_rate = record.delete(@presampled_key) if !sample_rate.is_a?(Integer) || sample_rate < 1 log.warn "Record emitted a presampled key (#{@presampled_key} = #{sample_rate}), but was not a valid sample rate #{record}" sample_rate = 1 end else sample_rate = @sample_rate if @sample_rate > 1 && rand(1..@sample_rate) > 1 next end end if @include_tag_key record[@tag_key] = tag end @flatten_keys.each do |k| next unless record[k].is_a?(Hash) record.merge!(flatten(record[k], k)) record.delete(k) end if (@dataset_from_key != "" && record.has_key?(@dataset_from_key)) dataset = record[@dataset_from_key] record.delete @dataset_from_key else dataset = @dataset end batch = batches[dataset] batch.push({ "data" => record, "samplerate" => sample_rate, "time" => Time.at(time).utc.to_datetime.rfc3339 }) end batches.each do |dataset, batch| publish_batch(dataset, batch, 0) end end |