Class: Fluent::SecureForwardOutput::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/output_node.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sender, shared_key, conf) ⇒ Node

Returns a new instance of Node.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/output_node.rb', line 19

def initialize(sender, shared_key, conf)
  @sender = sender
  @shared_key = shared_key

  @host = conf['host']
  @port = (conf['port'] || Fluent::SecureForwardOutput::DEFAULT_SECURE_CONNECT_PORT).to_i
  @hostlabel = conf['hostlabel'] || conf['host']
  @username = conf['username'] || ''
  @password = conf['password'] || ''
  @standby = conf.has_key?('standby') && Fluent::Config.bool_value(conf['standby']) != false

  @authentication = nil

  @keepalive = nil
  @expire = nil
  @first_session = false
  @detach = false

  @socket = nil
  @sslsession = nil
  @unpacker = MessagePack::Unpacker.new

  @shared_key_salt = generate_salt
  @state = :helo
  @thread = nil
end

Instance Attribute Details

#authenticationObject

Returns the value of attribute authentication.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def authentication
  @authentication
end

#detachObject

Returns the value of attribute detach.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def detach
  @detach
end

#expireObject (readonly)

Returns the value of attribute expire.



17
18
19
# File 'lib/fluent/plugin/output_node.rb', line 17

def expire
  @expire
end

#first_sessionObject

Returns the value of attribute first_session.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def first_session
  @first_session
end

#hostObject

Returns the value of attribute host.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def host
  @host
end

#hostlabelObject

Returns the value of attribute hostlabel.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def hostlabel
  @hostlabel
end

#keepaliveObject

Returns the value of attribute keepalive.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def keepalive
  @keepalive
end

#passwordObject

Returns the value of attribute password.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def password
  @password
end

#portObject

Returns the value of attribute port.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def port
  @port
end

#shared_keyObject

Returns the value of attribute shared_key.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def shared_key
  @shared_key
end

#shared_key_saltObject

Returns the value of attribute shared_key_salt.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def shared_key_salt
  @shared_key_salt
end

#socketObject

Returns the value of attribute socket.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def socket
  @socket
end

#sslsessionObject

Returns the value of attribute sslsession.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def sslsession
  @sslsession
end

#standbyObject

Returns the value of attribute standby.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def standby
  @standby
end

#stateObject

Returns the value of attribute state.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def state
  @state
end

#unpackerObject

Returns the value of attribute unpacker.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def unpacker
  @unpacker
end

#usernameObject

Returns the value of attribute username.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def username
  @username
end

Instance Method Details

#check_helo(message) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/output_node.rb', line 106

def check_helo(message)
  log.debug "checking helo"
  # ['HELO', options(hash)]
  unless message.size == 2 && message[0] == 'HELO'
    return false
  end
  opts = message[1]
  @authentication = opts['auth']
  @allow_keepalive = opts['keepalive']
  true
end

#check_pong(message) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fluent/plugin/output_node.rb', line 133

def check_pong(message)
  log.debug "checking pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + sharedkey)]
  unless message.size == 5 && message[0] == 'PONG'
    return false, 'invalid format for PONG message'
  end
  pong, auth_result, reason, hostname, shared_key_hexdigest = message

  unless auth_result
    return false, 'authentication failed: ' + reason
  end

  clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key).hexdigest
  unless shared_key_hexdigest == clientside
    return false, 'shared key mismatch'
  end

  return true, nil
end

#connectObject



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/fluent/plugin/output_node.rb', line 189

def connect
  log.debug "starting client"

  addr = @sender.hostname_resolver.getaddress(@host)
  log.debug "create tcp socket to node", :host => @host, :address => addr, :port => @port
  begin
    sock = TCPSocket.new(addr, @port)
  rescue => e
    log.warn "failed to connect for secure-forward", :error_class => e.class, :error => e, :host => @host, :address => addr, :port => @port
    @state = :failed
    return
  end

  log.trace "changing socket options"
  opt = [1, @sender.send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

  opt = [@sender.send_timeout.to_i, 0].pack('L!L!')  # struct timeval
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  # TODO: SSLContext constructer parameter (SSL/TLS protocol version)
  log.trace "initializing SSL contexts"
  context = OpenSSL::SSL::SSLContext.new
  # TODO: context.ca_file = (ca_file_path)
  # TODO: context.ciphers = (SSL Shared key chiper protocols)

  log.debug "trying to connect ssl session", :host => @host, :address => addr, :port => @port
  begin
    sslsession = OpenSSL::SSL::SSLSocket.new(sock, context)
  rescue => e
    log.warn "failed to establish SSL connection", :host => @host, :address => addr, :port => @port
  end

  unless sslsession.connect
    log.debug "failed to connect", :host => @host, :address => addr, :port => @port
    @state = :failed
    return
  end
  log.debug "ssl session connected", :host => @host, :port => @port

  begin
    unless @sender.allow_self_signed_certificate
      log.debug "checking peer's certificate", :subject => sslsession.peer_cert.subject
      sslsession.post_connection_check(@hostlabel)
      verify = sslsession.verify_result
      if verify != OpenSSL::X509::V_OK
        err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify)
        log.warn "failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)"
        log.warn "verify_result: #{err_name}"
        raise RuntimeError, "failed to verify certification while connecting host #{@host} as #{@hostlabel}"
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    log.warn "failed to verify certification while connecting ssl session", :host => @host, :hostlabel => @hostlabel
    self.shutdown
    raise
  end

  log.debug "ssl sessison connected", :host => @host, :port => @port
  @socket = sock
  @sslsession = sslsession

  buf = ''
  read_length = @sender.read_length
  read_interval = @sender.read_interval
  socket_interval = @sender.socket_interval

  loop do
    break if @detach

    begin
      while @sslsession.read_nonblock(read_length, buf)
        if buf == ''
          sleep read_interval
          next
        end
        @unpacker.feed_each(buf, &method(:on_read))
        buf = ''
      end
    rescue OpenSSL::SSL::SSLError
      # to wait i/o restart
      sleep socket_interval
    rescue EOFError
      log.warn "disconnected from #{@host}"
      break
    end
  end
  self.shutdown
end

#dupObject



50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/output_node.rb', line 50

def dup
  renewed = self.class.new(
    @sender,
    @shared_key,
    {'host' => @host, 'port' => @port, 'hostlabel' => @hostlabel, 'username' => @username, 'password' => @password}
  )
  renewed.keepalive = @keepalive if @keepalive
  renewed
end

#established?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/fluent/plugin/output_node.rb', line 90

def established?
  @state == :established
end

#expired?Boolean

Returns:

  • (Boolean)


94
95
96
97
98
99
100
# File 'lib/fluent/plugin/output_node.rb', line 94

def expired?
  if @keepalive.nil? || @keepalive == 0
    false
  else
    @expire && @expire < Time.now
  end
end

#generate_pingObject



118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/fluent/plugin/output_node.rb', line 118

def generate_ping
  log.debug "generating ping"
  # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key).hexdigest
  ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest]
  if @authentication != ''
    password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest
    ping.push(@username, password_hexdigest)
  else
    ping.push('','')
  end
  ping
end

#generate_saltObject



102
103
104
# File 'lib/fluent/plugin/output_node.rb', line 102

def generate_salt
  OpenSSL::Random.random_bytes(16)
end

#joinObject



86
87
88
# File 'lib/fluent/plugin/output_node.rb', line 86

def join
  @thread && @thread.join
end

#logObject



46
47
48
# File 'lib/fluent/plugin/output_node.rb', line 46

def log
  @sender.log
end

#on_read(data) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/fluent/plugin/output_node.rb', line 158

def on_read(data)
  log.debug "on_read"
  if self.established?
    #TODO: ACK
    log.warn "unknown packets arrived..."
    return
  end

  case @state
  when :helo
    unless check_helo(data)
      log.warn "received invalid helo message from #{@host}"
      self.shutdown
      return
    end
    send_data generate_ping()
    @state = :pingpong
  when :pingpong
    success, reason = check_pong(data)
    unless success
      log.warn "connection refused to #{@host}:" + reason
      self.shutdown
      return
    end
    log.info "connection established to #{@host}" if @first_session
    @state = :established
    @expire = Time.now + @keepalive if @keepalive && @keepalive > 0
    log.debug "connection established", :host => @host, :port => @port, :expire => @expire
  end
end

#send_data(data) ⇒ Object



154
155
156
# File 'lib/fluent/plugin/output_node.rb', line 154

def send_data(data)
  @sslsession.write data.to_msgpack
end

#shutdownObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/output_node.rb', line 66

def shutdown
  log.debug "shutting down node #{@host}"
  @state = :closed

  if @thread == Thread.current
    @sslsession.close if @sslsession
    @socket.close if @socket
    @thread.kill
  else
    if @thread
      @thread.kill
      @thread.join
    end
    @sslsession.close if @sslsession
    @socket.close if @socket
  end
rescue => e
  log.debug "error on node shutdown #{e.class}:#{e.message}"
end

#startObject



60
61
62
63
64
# File 'lib/fluent/plugin/output_node.rb', line 60

def start
  @thread = Thread.new(&method(:connect))
  ## If you want to check code bug, turn this line enable
  # @thread.abort_on_exception = true
end