Class: Fluent::SecureForwardOutput::Node
- Inherits:
-
Object
- Object
- Fluent::SecureForwardOutput::Node
- Defined in:
- lib/fluent/plugin/output_node.rb
Instance Attribute Summary collapse
-
#authentication ⇒ Object
Returns the value of attribute authentication.
-
#detach ⇒ Object
Returns the value of attribute detach.
-
#expire ⇒ Object
readonly
Returns the value of attribute expire.
-
#first_session ⇒ Object
Returns the value of attribute first_session.
-
#host ⇒ Object
Returns the value of attribute host.
-
#hostlabel ⇒ Object
Returns the value of attribute hostlabel.
-
#keepalive ⇒ Object
Returns the value of attribute keepalive.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
Returns the value of attribute port.
-
#shared_key ⇒ Object
Returns the value of attribute shared_key.
-
#shared_key_salt ⇒ Object
Returns the value of attribute shared_key_salt.
-
#socket ⇒ Object
Returns the value of attribute socket.
-
#sslsession ⇒ Object
Returns the value of attribute sslsession.
-
#standby ⇒ Object
Returns the value of attribute standby.
-
#state ⇒ Object
Returns the value of attribute state.
-
#unpacker ⇒ Object
Returns the value of attribute unpacker.
-
#username ⇒ Object
Returns the value of attribute username.
Instance Method Summary collapse
- #check_helo(message) ⇒ Object
- #check_pong(message) ⇒ Object
- #connect ⇒ Object
- #detach! ⇒ Object
- #detached? ⇒ Boolean
- #dup ⇒ Object
- #established? ⇒ Boolean
- #expired? ⇒ Boolean
- #generate_ping ⇒ Object
- #generate_salt ⇒ Object
-
#initialize(sender, conf) ⇒ Node
constructor
A new instance of Node.
- #join ⇒ Object
- #log ⇒ Object
- #on_read(data) ⇒ Object
- #release! ⇒ Object
- #send_data(data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #tain! ⇒ Object
- #tained? ⇒ Boolean
Constructor Details
#initialize(sender, conf) ⇒ Node
Returns a new instance of Node.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/output_node.rb', line 21 def initialize(sender, conf) @sender = sender @shared_key = conf.shared_key || sender.shared_key @host = conf.host @port = conf.port @hostlabel = conf.hostlabel || conf.host @username = conf.username @password = conf.password @standby = conf.standby @proxy_uri = conf.proxy_uri @keepalive = sender.keepalive @connection_hard_timeout = sender.connection_hard_timeout @authentication = nil @writing = false @expire = nil @first_session = false @detach = false @socket = nil @sslsession = nil @unpacker = MessagePack::Unpacker.new @shared_key_salt = generate_salt @state = :helo @mtime = Time.now @thread = nil end |
Instance Attribute Details
#authentication ⇒ Object
Returns the value of attribute authentication.
14 15 16 |
# File 'lib/fluent/plugin/output_node.rb', line 14 def authentication @authentication end |
#detach ⇒ Object
Returns the value of attribute detach.
17 18 19 |
# File 'lib/fluent/plugin/output_node.rb', line 17 def detach @detach end |
#expire ⇒ Object (readonly)
Returns the value of attribute expire.
19 20 21 |
# File 'lib/fluent/plugin/output_node.rb', line 19 def expire @expire end |
#first_session ⇒ Object
Returns the value of attribute first_session.
17 18 19 |
# File 'lib/fluent/plugin/output_node.rb', line 17 def first_session @first_session end |
#host ⇒ Object
Returns the value of attribute host.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def host @host end |
#hostlabel ⇒ Object
Returns the value of attribute hostlabel.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def hostlabel @hostlabel end |
#keepalive ⇒ Object
Returns the value of attribute keepalive.
14 15 16 |
# File 'lib/fluent/plugin/output_node.rb', line 14 def keepalive @keepalive end |
#password ⇒ Object
Returns the value of attribute password.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def password @password end |
#port ⇒ Object
Returns the value of attribute port.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def port @port end |
#shared_key ⇒ Object
Returns the value of attribute shared_key.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def shared_key @shared_key end |
#shared_key_salt ⇒ Object
Returns the value of attribute shared_key_salt.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def shared_key_salt @shared_key_salt end |
#socket ⇒ Object
Returns the value of attribute socket.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def socket @socket end |
#sslsession ⇒ Object
Returns the value of attribute sslsession.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def sslsession @sslsession end |
#standby ⇒ Object
Returns the value of attribute standby.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def standby @standby end |
#state ⇒ Object
Returns the value of attribute state.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def state @state end |
#unpacker ⇒ Object
Returns the value of attribute unpacker.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def unpacker @unpacker end |
#username ⇒ Object
Returns the value of attribute username.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def username @username end |
Instance Method Details
#check_helo(message) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/fluent/plugin/output_node.rb', line 132 def check_helo() log.debug "checking helo" # ['HELO', options(hash)] unless .size == 2 && [0] == 'HELO' return false end opts = [1] @shared_key_nonce = opts['nonce'] || '' # make shared_key_check failed (instead of error) if protocol version mismatch exist @authentication = opts['auth'] @allow_keepalive = opts['keepalive'] @mtime = Time.now true end |
#check_pong(message) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/fluent/plugin/output_node.rb', line 162 def check_pong() log.debug "checking pong" # ['PONG', bool(authentication result), 'reason if authentication failed', # self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)] unless .size == 5 && [0] == 'PONG' return false, 'invalid format for PONG message' end _pong, auth_result, reason, hostname, shared_key_hexdigest = unless auth_result return false, 'authentication failed: ' + reason end if hostname == @sender.self_hostname return false, 'same hostname between input and output: invalid configuration' end clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest unless shared_key_hexdigest == clientside return false, 'shared key mismatch' end @mtime = Time.now return true, nil end |
#connect ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/fluent/plugin/output_node.rb', line 226 def connect Thread.current.abort_on_exception = true log.debug "starting client" begin addr = @sender.hostname_resolver.getaddress(@host) log.debug "create tcp socket to node", host: @host, address: addr, port: @port rescue => e log.warn "failed to resolve the hostname", error_class: e.class, error: e, host: @host @state = :failed return end begin if @proxy_uri.nil? then sock = TCPSocket.new(addr, @port) else proxy = Proxifier::Proxy(@proxy_uri) sock = proxy.open(addr, @port) end rescue => e log.warn "failed to connect for secure-forward", error_class: e.class, error: e, host: @host, address: addr, port: @port @state = :failed return end log.trace "changing socket options" opt = [1, @sender.send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@sender.send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) log.trace "initializing SSL contexts" context = OpenSSL::SSL::SSLContext.new(@sender.ssl_version) log.trace "setting SSL verification options" if @sender.secure # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS # https://bugs.ruby-lang.org/issues/9424 context.set_params({}) if @sender.ssl_ciphers context.ciphers = @sender.ssl_ciphers else ### follow httpclient configuration by nahi # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH" context.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default end log.trace "set verify_mode VERIFY_PEER" context.verify_mode = OpenSSL::SSL::VERIFY_PEER if @sender.enable_strict_verification context.cert_store = OpenSSL::X509::Store.new begin context.cert_store.set_default_paths rescue OpenSSL::X509::StoreError => e log.warn "faild to load system default certificates", error: e end end if @sender.ca_cert_path log.trace "set to use private CA", path: @sender.ca_cert_path context.ca_file = @sender.ca_cert_path end end log.debug "trying to connect ssl session", host: @host, address: addr, port: @port begin sslsession = OpenSSL::SSL::SSLSocket.new(sock, context) log.trace "connecting...", host: @host, address: addr, port: @port sslsession.connect @mtime = Time.now rescue => e log.warn "failed to establish SSL connection", error_class: e.class, error: e, host: @host, address: addr, port: @port @state = :failed return end log.debug "ssl session connected", host: @host, port: @port begin if @sender.enable_strict_verification log.debug "checking peer's certificate", subject: sslsession.peer_cert.subject sslsession.post_connection_check(@hostlabel) verify = sslsession.verify_result if verify != OpenSSL::X509::V_OK err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify) log.warn "BUG: failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)" log.warn "BUG: verify_result: #{err_name}" raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{@host} as #{@hostlabel}" end end rescue OpenSSL::SSL::SSLError => e log.warn "failed to verify certification while connecting ssl session", host: @host, hostlabel: @hostlabel self.shutdown raise end log.debug "ssl session connected", host: @host, port: @port @socket = sock @sslsession = sslsession buf = '' read_length = @sender.read_length read_interval = @sender.read_interval socket_interval = @sender.socket_interval @mtime = Time.now loop do break if @detach break if @connection_hard_timeout && Time.now > @mtime + @connection_hard_timeout begin while @sslsession.read_nonblock(read_length, buf) if buf == '' sleep read_interval next end @unpacker.feed_each(buf, &method(:on_read)) @mtime = Time.now buf = '' end rescue OpenSSL::SSL::SSLError => e # to wait i/o restart log.trace "SSLError", error_class: e.class, error: e, mtime: @mtime, host: @host, port: @port if @connection_hard_timeout && Time.now > @mtime + @connection_hard_timeout log.warn "connection hard timeout", mtime: @mtime, timeout: @connection_hard_timeout, host: @host, port: @port log.warn "aborting connection", host: @host, port: @port self.release! self.detach! break else sleep socket_interval end rescue SystemCallError => e log.warn "disconnected by Error", error_class: e.class, error: e, host: @host, port: @port self.release! self.detach! break rescue EOFError log.warn "disconnected", host: @host, port: @port self.release! self.detach! break end end while @writing break if @detach sleep read_interval end self.shutdown end |
#detach! ⇒ Object
71 72 73 |
# File 'lib/fluent/plugin/output_node.rb', line 71 def detach! @detach = true end |
#detached? ⇒ Boolean
75 76 77 |
# File 'lib/fluent/plugin/output_node.rb', line 75 def detached? @detach end |
#dup ⇒ Object
59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/output_node.rb', line 59 def dup renewed = self.class.new( @sender, Fluent::Config::Section.new({host: @host, port: @port, hostlabel: @hostlabel, username: @username, password: @password, shared_key: @shared_key, standby: @standby, proxy_uri: @proxy_uri}) ) renewed end |
#established? ⇒ Boolean
116 117 118 |
# File 'lib/fluent/plugin/output_node.rb', line 116 def established? @state == :established end |
#expired? ⇒ Boolean
120 121 122 123 124 125 126 |
# File 'lib/fluent/plugin/output_node.rb', line 120 def expired? if @keepalive.nil? || @keepalive == 0 false else @expire && @expire < Time.now end end |
#generate_ping ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/fluent/plugin/output_node.rb', line 146 def generate_ping log.debug "generating ping" # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key), # username || '', sha512\_hex(auth\_salt + username + password) || ''] shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key_nonce).update(@shared_key).hexdigest ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest] if @authentication != '' password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest ping.push(@username, password_hexdigest) else ping.push('','') end @mtime = Time.now ping end |
#generate_salt ⇒ Object
128 129 130 |
# File 'lib/fluent/plugin/output_node.rb', line 128 def generate_salt OpenSSL::Random.random_bytes(16) end |
#join ⇒ Object
112 113 114 |
# File 'lib/fluent/plugin/output_node.rb', line 112 def join @thread && @thread.join end |
#log ⇒ Object
55 56 57 |
# File 'lib/fluent/plugin/output_node.rb', line 55 def log @sender.log end |
#on_read(data) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/fluent/plugin/output_node.rb', line 193 def on_read(data) log.debug "on_read" if self.established? #TODO: ACK log.warn "unknown packets arrived..." return end case @state when :helo unless check_helo(data) log.warn "received invalid helo message from #{@host}" self.shutdown return end send_data generate_ping() @mtime = Time.now @state = :pingpong when :pingpong success, reason = check_pong(data) unless success log.warn "connection refused to #{@host}:" + reason self.shutdown return end log.info "connection established to #{@host}" if @first_session @state = :established @expire = Time.now + @keepalive if @keepalive && @keepalive > 0 @mtime = Time.now log.debug "connection established", host: @host, port: @port, expire: @expire end end |
#release! ⇒ Object
88 89 90 |
# File 'lib/fluent/plugin/output_node.rb', line 88 def release! @writing = false end |
#send_data(data) ⇒ Object
188 189 190 191 |
# File 'lib/fluent/plugin/output_node.rb', line 188 def send_data(data) @mtime = Time.now @sslsession.write data.to_msgpack end |
#shutdown ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/output_node.rb', line 92 def shutdown log.debug "shutting down node #{@host}" @state = :closed if @thread == Thread.current @sslsession.close if @sslsession @socket.close if @socket @thread.kill else if @thread @thread.kill @thread.join end @sslsession.close if @sslsession @socket.close if @socket end rescue => e log.debug "error on node shutdown #{e.class}:#{e.}" end |
#start ⇒ Object
67 68 69 |
# File 'lib/fluent/plugin/output_node.rb', line 67 def start @thread = Thread.new(&method(:connect)) end |
#tain! ⇒ Object
79 80 81 82 |
# File 'lib/fluent/plugin/output_node.rb', line 79 def tain! raise RuntimeError, "BUG: taining detached node" if @detach @writing = true end |
#tained? ⇒ Boolean
84 85 86 |
# File 'lib/fluent/plugin/output_node.rb', line 84 def tained? @writing end |