Class: Fluent::LoomsystemsOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::LoomsystemsOutput
show all
- Defined in:
- lib/fluent/plugin/out_loomsystems.rb
Defined Under Namespace
Classes: ConnectionFailure
Instance Method Summary
collapse
Constructor Details
Returns a new instance of LoomsystemsOutput.
22
23
24
|
# File 'lib/fluent/plugin/out_loomsystems.rb', line 22
def initialize
super
end
|
Instance Method Details
#client ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/fluent/plugin/out_loomsystems.rb', line 45
def client
@_socket ||= if @use_ssl
log.info "opening ssl socket"
context = OpenSSL::SSL::SSLContext.new
socket = TCPSocket.new @host, @ssl_port
ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
sock = ssl_client.connect
else
TCPSocket.new @host, @port
end
@_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
@_socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 10)
@_socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 3)
@_socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3)
return @_socket
end
|
31
32
33
|
# File 'lib/fluent/plugin/out_loomsystems.rb', line 31
def configure(conf)
super
end
|
This method is called when an event reaches Fluentd.
66
67
68
69
|
# File 'lib/fluent/plugin/out_loomsystems.rb', line 66
def format(tag, time, record)
return [tag, {"timestamp"=>time, "host"=>@host, "message"=>record}].to_msgpack
end
|
#send_to_loomsystems(data) ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/fluent/plugin/out_loomsystems.rb', line 92
def send_to_loomsystems(data)
retries = 0
begin
log.trace "New attempt to loomsystems attempt=#{retries}" if retries > 0
log.trace "Send nb_event=#{data.size} events to loomsystems"
data.each do |event|
client.write(event)
end
rescue Errno::EHOSTUNREACH, Errno::ECONNREFUSED, Errno::ETIMEDOUT, Errno::EPIPE => e
if retries < @max_retries || max_retries == -1
@_socket = nil
a_couple_of_seconds = retries ** 2
a_couple_of_seconds = 30 unless a_couple_of_seconds < 30
retries += 1
log.warn "Could not push logs to loomsystems, attempt=#{retries} max_attempts=#{max_retries} wait=#{a_couple_of_seconds}s error=#{e.message}"
sleep a_couple_of_seconds
retry
end
raise ConnectionFailure, "Could not push logs to loomsystems after #{retries} retries, #{e.message}"
end
end
|
#shutdown ⇒ Object
40
41
42
43
|
# File 'lib/fluent/plugin/out_loomsystems.rb', line 40
def shutdown
log.info "----shutdown"
super
end
|
#start ⇒ Object
35
36
37
38
|
# File 'lib/fluent/plugin/out_loomsystems.rb', line 35
def start
log.info "----start"
super
end
|
#write(chunk) ⇒ Object
NOTE! This method is called by internal thread, not Fluentd’s main thread. ‘chunk’ is a buffer chunk that includes multiple formatted events.
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/fluent/plugin/out_loomsystems.rb', line 73
def write(chunk)
messages = Array.new
chunk.msgpack_each do |tag, record|
next unless record.is_a? Hash
next unless record.has_key? "message"
if @include_tag_key
record[@tag_key] = tag
end
if @use_json
messages.push Yajl.dump(record) + "\n"
else
messages.push record["message"].rstrip() + "\n"
end
end
send_to_loomsystems(messages)
end
|