Class: Fluent::LogzioOutputBuffered

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_logzio_buffered.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



14
15
16
17
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 14

def configure(conf)
  super
  $log.debug "Logzio url #{@endpoint_url}"
end

#format(tag, time, record) ⇒ Object



34
35
36
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 34

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



30
31
32
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 30

def shutdown
  super
end

#startObject



19
20
21
22
23
24
25
26
27
28
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 19

def start
  super
  require 'net/http/persistent'
  @uri = URI @endpoint_url
  @http = Net::HTTP::Persistent.new 'fluent-plugin-logzio-ng', :ENV
  @http.headers['Content-Type'] = 'text/plain'
  @http.idle_timeout = @http_idle_timeout
  @http.socket_options << [Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1]
  $log.debug "Started logzio shipper.."
end

#write(chunk) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/out_logzio_buffered.rb', line 38

def write(chunk)
  records = []

  chunk.msgpack_each {|tag,time,record|
    record['@timestamp'] ||= Time.at(time).iso8601(3) if @output_include_time
    record['fluentd_tags'] ||= tag.to_s if @output_include_tags
    records.push(Yajl.dump(record))
  }

  $log.debug "Got flush timeout, containing #{records.length} chunks"

  # Setting our request
  post = Net::HTTP::Post.new @uri.request_uri

  # Logz.io bulk http endpoint expecting log line with \n delimiter
  post.body = records.join("\n")

  begin
    response = @http.request @uri, post
    $log.debug "HTTP Response code #{response.code}"

    if response.code != '200'

      $log.debug "Got HTTP #{response.code} from logz.io, not giving up just yet"

      # If any other non-200, we will try to resend it after 2, 4 and 8 seconds. Then we will give up

      sleep_interval = 2
      @retry_count.times do |counter|

        $log.debug "Sleeping for #{sleep_interval} seconds, and trying again."

        sleep(sleep_interval)

        # Retry
        response = @http.request @uri, post

        # Sucecss, no further action is needed
        if response.code == 200

          $log.debug "Successfuly sent the failed bulk."

          # Breaking out
          break

        else

          # Doubling the sleep interval
          sleep_interval *= 2

          if counter == @retry_count - 1

            $log.error "Could not send your bulk after 3 tries. Sorry. Got HTTP #{response.code}"
          end
        end
      end
    end
  rescue StandardError => error
    $log.error "Error connecting to logzio. Got exception: #{error}"
  end
end