Class: Fluent::LoomsystemsOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_loomsystems.rb

Defined Under Namespace

Classes: ConnectionFailure

Instance Method Summary collapse

Constructor Details

#initializeLoomsystemsOutput

Returns a new instance of LoomsystemsOutput.



22
23
24
# File 'lib/fluent/plugin/out_loomsystems.rb', line 22

def initialize
  super
end

Instance Method Details

#clientObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_loomsystems.rb', line 45

def client
 @_socket ||= if @use_ssl
    log.info "opening ssl socket"
    context    = OpenSSL::SSL::SSLContext.new
    socket     = TCPSocket.new @host, @ssl_port
    ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
    sock = ssl_client.connect
  else
    TCPSocket.new @host, @port
  end

  @_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
  @_socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 10)
  @_socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 3)
  @_socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3)

  return @_socket

end

#configure(conf) ⇒ Object



31
32
33
# File 'lib/fluent/plugin/out_loomsystems.rb', line 31

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches Fluentd.



66
67
68
69
# File 'lib/fluent/plugin/out_loomsystems.rb', line 66

def format(tag, time, record)
  #log.info "[tag, time, record] = #{tag}, #{time}, #{record}}"
  return [tag, {"timestamp"=>time, "host"=>@host, "message"=>record}].to_msgpack
end

#send_to_loomsystems(data) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_loomsystems.rb', line 92

def send_to_loomsystems(data)   
  retries = 0

  begin

    # Check the connectivity and write messages
    #connected,x = client.recv(0)
    #log.trace  "Connected=#{connected},#{x}"
    #raise Errno::ECONNREFUSED, "Client has lost server connection" if connected == 0
    log.trace "New attempt to loomsystems attempt=#{retries}" if retries > 0
    log.trace "Send nb_event=#{data.size} events to loomsystems"
    data.each do |event|
      client.write(event)
    end


  # Handle some failures
  rescue Errno::EHOSTUNREACH, Errno::ECONNREFUSED, Errno::ETIMEDOUT, Errno::EPIPE => e

    if retries < @max_retries || max_retries == -1
      @_socket = nil
      a_couple_of_seconds = retries ** 2
      a_couple_of_seconds = 30 unless a_couple_of_seconds < 30
      retries += 1
      log.warn "Could not push logs to loomsystems, attempt=#{retries} max_attempts=#{max_retries} wait=#{a_couple_of_seconds}s error=#{e.message}"
      sleep a_couple_of_seconds
      retry
    end
    raise ConnectionFailure, "Could not push logs to loomsystems after #{retries} retries, #{e.message}"
  end
end

#shutdownObject



40
41
42
43
# File 'lib/fluent/plugin/out_loomsystems.rb', line 40

def shutdown
  log.info "----shutdown"
  super
end

#startObject



35
36
37
38
# File 'lib/fluent/plugin/out_loomsystems.rb', line 35

def start
  log.info "----start"
  super
end

#write(chunk) ⇒ Object

NOTE! This method is called by internal thread, not Fluentd’s main thread. ‘chunk’ is a buffer chunk that includes multiple formatted events.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/out_loomsystems.rb', line 73

def write(chunk) 
  messages = Array.new
  chunk.msgpack_each do |tag, record|
    next unless record.is_a? Hash
    next unless record.has_key? "message"

    if @include_tag_key
      record[@tag_key] = tag
    end
    if @use_json
      messages.push Yajl.dump(record) + "\n"
    else
      messages.push record["message"].rstrip() + "\n"
    end
  end
  send_to_loomsystems(messages)

end