Class: Fluent::SecureForwardOutput::Node
- Inherits:
-
Object
- Object
- Fluent::SecureForwardOutput::Node
- Defined in:
- lib/fluent/plugin/output_node.rb
Instance Attribute Summary collapse
-
#authentication ⇒ Object
Returns the value of attribute authentication.
-
#detach ⇒ Object
Returns the value of attribute detach.
-
#expire ⇒ Object
readonly
Returns the value of attribute expire.
-
#first_session ⇒ Object
Returns the value of attribute first_session.
-
#host ⇒ Object
Returns the value of attribute host.
-
#hostlabel ⇒ Object
Returns the value of attribute hostlabel.
-
#keepalive ⇒ Object
Returns the value of attribute keepalive.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
Returns the value of attribute port.
-
#shared_key ⇒ Object
Returns the value of attribute shared_key.
-
#shared_key_salt ⇒ Object
Returns the value of attribute shared_key_salt.
-
#socket ⇒ Object
Returns the value of attribute socket.
-
#sslsession ⇒ Object
Returns the value of attribute sslsession.
-
#standby ⇒ Object
Returns the value of attribute standby.
-
#state ⇒ Object
Returns the value of attribute state.
-
#unpacker ⇒ Object
Returns the value of attribute unpacker.
-
#username ⇒ Object
Returns the value of attribute username.
Instance Method Summary collapse
- #check_helo(message) ⇒ Object
- #check_pong(message) ⇒ Object
- #connect ⇒ Object
- #dup ⇒ Object
- #established? ⇒ Boolean
- #expired? ⇒ Boolean
- #generate_ping ⇒ Object
- #generate_salt ⇒ Object
-
#initialize(sender, shared_key, conf) ⇒ Node
constructor
A new instance of Node.
- #join ⇒ Object
- #log ⇒ Object
- #on_read(data) ⇒ Object
- #send_data(data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(sender, shared_key, conf) ⇒ Node
Returns a new instance of Node.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/output_node.rb', line 19 def initialize(sender, shared_key, conf) @sender = sender @shared_key = shared_key @host = conf['host'] @port = (conf['port'] || Fluent::SecureForwardOutput::DEFAULT_SECURE_CONNECT_PORT).to_i @hostlabel = conf['hostlabel'] || conf['host'] @username = conf['username'] || '' @password = conf['password'] || '' @standby = conf.has_key?('standby') && Fluent::Config.bool_value(conf['standby']) != false @authentication = nil @keepalive = nil @expire = nil @first_session = false @detach = false @socket = nil @sslsession = nil @unpacker = MessagePack::Unpacker.new @shared_key_salt = generate_salt @state = :helo @thread = nil end |
Instance Attribute Details
#authentication ⇒ Object
Returns the value of attribute authentication.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def authentication @authentication end |
#detach ⇒ Object
Returns the value of attribute detach.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def detach @detach end |
#expire ⇒ Object (readonly)
Returns the value of attribute expire.
17 18 19 |
# File 'lib/fluent/plugin/output_node.rb', line 17 def expire @expire end |
#first_session ⇒ Object
Returns the value of attribute first_session.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def first_session @first_session end |
#host ⇒ Object
Returns the value of attribute host.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def host @host end |
#hostlabel ⇒ Object
Returns the value of attribute hostlabel.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def hostlabel @hostlabel end |
#keepalive ⇒ Object
Returns the value of attribute keepalive.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def keepalive @keepalive end |
#password ⇒ Object
Returns the value of attribute password.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def password @password end |
#port ⇒ Object
Returns the value of attribute port.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def port @port end |
#shared_key ⇒ Object
Returns the value of attribute shared_key.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def shared_key @shared_key end |
#shared_key_salt ⇒ Object
Returns the value of attribute shared_key_salt.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def shared_key_salt @shared_key_salt end |
#socket ⇒ Object
Returns the value of attribute socket.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def socket @socket end |
#sslsession ⇒ Object
Returns the value of attribute sslsession.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def sslsession @sslsession end |
#standby ⇒ Object
Returns the value of attribute standby.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def standby @standby end |
#state ⇒ Object
Returns the value of attribute state.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def state @state end |
#unpacker ⇒ Object
Returns the value of attribute unpacker.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def unpacker @unpacker end |
#username ⇒ Object
Returns the value of attribute username.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def username @username end |
Instance Method Details
#check_helo(message) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/fluent/plugin/output_node.rb', line 106 def check_helo() log.debug "checking helo" # ['HELO', options(hash)] unless .size == 2 && [0] == 'HELO' return false end opts = [1] @authentication = opts['auth'] @allow_keepalive = opts['keepalive'] true end |
#check_pong(message) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/fluent/plugin/output_node.rb', line 133 def check_pong() log.debug "checking pong" # ['PONG', bool(authentication result), 'reason if authentication failed', # self_hostname, sha512\_hex(salt + self_hostname + sharedkey)] unless .size == 5 && [0] == 'PONG' return false, 'invalid format for PONG message' end pong, auth_result, reason, hostname, shared_key_hexdigest = unless auth_result return false, 'authentication failed: ' + reason end clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key).hexdigest unless shared_key_hexdigest == clientside return false, 'shared key mismatch' end return true, nil end |
#connect ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/fluent/plugin/output_node.rb', line 189 def connect log.debug "starting client" addr = @sender.hostname_resolver.getaddress(@host) log.debug "create tcp socket to node", :host => @host, :address => addr, :port => @port begin sock = TCPSocket.new(addr, @port) rescue => e log.warn "failed to connect for secure-forward", :error_class => e.class, :error => e, :host => @host, :address => addr, :port => @port @state = :failed return end log.trace "changing socket options" opt = [1, @sender.send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@sender.send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) # TODO: SSLContext constructer parameter (SSL/TLS protocol version) log.trace "initializing SSL contexts" context = OpenSSL::SSL::SSLContext.new # TODO: context.ca_file = (ca_file_path) # TODO: context.ciphers = (SSL Shared key chiper protocols) log.debug "trying to connect ssl session", :host => @host, :address => addr, :port => @port begin sslsession = OpenSSL::SSL::SSLSocket.new(sock, context) rescue => e log.warn "failed to establish SSL connection", :host => @host, :address => addr, :port => @port end unless sslsession.connect log.debug "failed to connect", :host => @host, :address => addr, :port => @port @state = :failed return end log.debug "ssl session connected", :host => @host, :port => @port begin unless @sender.allow_self_signed_certificate log.debug "checking peer's certificate", :subject => sslsession.peer_cert.subject sslsession.post_connection_check(@hostlabel) verify = sslsession.verify_result if verify != OpenSSL::X509::V_OK err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify) log.warn "failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)" log.warn "verify_result: #{err_name}" raise RuntimeError, "failed to verify certification while connecting host #{@host} as #{@hostlabel}" end end rescue OpenSSL::SSL::SSLError => e log.warn "failed to verify certification while connecting ssl session", :host => @host, :hostlabel => @hostlabel self.shutdown raise end log.debug "ssl sessison connected", :host => @host, :port => @port @socket = sock @sslsession = sslsession buf = '' read_length = @sender.read_length read_interval = @sender.read_interval socket_interval = @sender.socket_interval loop do break if @detach begin while @sslsession.read_nonblock(read_length, buf) if buf == '' sleep read_interval next end @unpacker.feed_each(buf, &method(:on_read)) buf = '' end rescue OpenSSL::SSL::SSLError # to wait i/o restart sleep socket_interval rescue EOFError log.warn "disconnected from #{@host}" break end end self.shutdown end |
#dup ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/output_node.rb', line 50 def dup renewed = self.class.new( @sender, @shared_key, {'host' => @host, 'port' => @port, 'hostlabel' => @hostlabel, 'username' => @username, 'password' => @password} ) renewed.keepalive = @keepalive if @keepalive renewed end |
#established? ⇒ Boolean
90 91 92 |
# File 'lib/fluent/plugin/output_node.rb', line 90 def established? @state == :established end |
#expired? ⇒ Boolean
94 95 96 97 98 99 100 |
# File 'lib/fluent/plugin/output_node.rb', line 94 def expired? if @keepalive.nil? || @keepalive == 0 false else @expire && @expire < Time.now end end |
#generate_ping ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/fluent/plugin/output_node.rb', line 118 def generate_ping log.debug "generating ping" # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + shared_key), # username || '', sha512\_hex(auth\_salt + username + password) || ''] shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key).hexdigest ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest] if @authentication != '' password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest ping.push(@username, password_hexdigest) else ping.push('','') end ping end |
#generate_salt ⇒ Object
102 103 104 |
# File 'lib/fluent/plugin/output_node.rb', line 102 def generate_salt OpenSSL::Random.random_bytes(16) end |
#join ⇒ Object
86 87 88 |
# File 'lib/fluent/plugin/output_node.rb', line 86 def join @thread && @thread.join end |
#log ⇒ Object
46 47 48 |
# File 'lib/fluent/plugin/output_node.rb', line 46 def log @sender.log end |
#on_read(data) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/fluent/plugin/output_node.rb', line 158 def on_read(data) log.debug "on_read" if self.established? #TODO: ACK log.warn "unknown packets arrived..." return end case @state when :helo unless check_helo(data) log.warn "received invalid helo message from #{@host}" self.shutdown return end send_data generate_ping() @state = :pingpong when :pingpong success, reason = check_pong(data) unless success log.warn "connection refused to #{@host}:" + reason self.shutdown return end log.info "connection established to #{@host}" if @first_session @state = :established @expire = Time.now + @keepalive if @keepalive && @keepalive > 0 log.debug "connection established", :host => @host, :port => @port, :expire => @expire end end |
#send_data(data) ⇒ Object
154 155 156 |
# File 'lib/fluent/plugin/output_node.rb', line 154 def send_data(data) @sslsession.write data.to_msgpack end |
#shutdown ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluent/plugin/output_node.rb', line 66 def shutdown log.debug "shutting down node #{@host}" @state = :closed if @thread == Thread.current @sslsession.close if @sslsession @socket.close if @socket @thread.kill else if @thread @thread.kill @thread.join end @sslsession.close if @sslsession @socket.close if @socket end rescue => e log.debug "error on node shutdown #{e.class}:#{e.}" end |
#start ⇒ Object
60 61 62 63 64 |
# File 'lib/fluent/plugin/output_node.rb', line 60 def start @thread = Thread.new(&method(:connect)) ## If you want to check code bug, turn this line enable # @thread.abort_on_exception = true end |