Module: Fluent::PluginHelper::Socket

Includes:
SocketOption
Defined in:
lib/fluent/plugin_helper/socket.rb

Defined Under Namespace

Modules: WrappedSocket

Constant Summary

Constants included from SocketOption

Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER_WINDOWS, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_TIMEVAL

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SocketOption

#socket_option_set, #socket_option_set_one, #socket_option_validate!

Instance Attribute Details

#_socketsObject (readonly)

for tests



37
38
39
# File 'lib/fluent/plugin_helper/socket.rb', line 37

def _sockets
  @_sockets
end

Class Method Details

.tls_verify_result_name(code) ⇒ Object



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/fluent/plugin_helper/socket.rb', line 218

def self.tls_verify_result_name(code)
  case code
  when OpenSSL::X509::V_OK then 'V_OK'
  when OpenSSL::X509::V_ERR_AKID_SKID_MISMATCH then 'V_ERR_AKID_SKID_MISMATCH'
  when OpenSSL::X509::V_ERR_APPLICATION_VERIFICATION then 'V_ERR_APPLICATION_VERIFICATION'
  when OpenSSL::X509::V_ERR_CERT_CHAIN_TOO_LONG then 'V_ERR_CERT_CHAIN_TOO_LONG'
  when OpenSSL::X509::V_ERR_CERT_HAS_EXPIRED then 'V_ERR_CERT_HAS_EXPIRED'
  when OpenSSL::X509::V_ERR_CERT_NOT_YET_VALID then 'V_ERR_CERT_NOT_YET_VALID'
  when OpenSSL::X509::V_ERR_CERT_REJECTED then 'V_ERR_CERT_REJECTED'
  when OpenSSL::X509::V_ERR_CERT_REVOKED then 'V_ERR_CERT_REVOKED'
  when OpenSSL::X509::V_ERR_CERT_SIGNATURE_FAILURE then 'V_ERR_CERT_SIGNATURE_FAILURE'
  when OpenSSL::X509::V_ERR_CERT_UNTRUSTED then 'V_ERR_CERT_UNTRUSTED'
  when OpenSSL::X509::V_ERR_CRL_HAS_EXPIRED then 'V_ERR_CRL_HAS_EXPIRED'
  when OpenSSL::X509::V_ERR_CRL_NOT_YET_VALID then 'V_ERR_CRL_NOT_YET_VALID'
  when OpenSSL::X509::V_ERR_CRL_SIGNATURE_FAILURE then 'V_ERR_CRL_SIGNATURE_FAILURE'
  when OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT then 'V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT'
  when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD'
  when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD'
  when OpenSSL::X509::V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD'
  when OpenSSL::X509::V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD'
  when OpenSSL::X509::V_ERR_INVALID_CA then 'V_ERR_INVALID_CA'
  when OpenSSL::X509::V_ERR_INVALID_PURPOSE then 'V_ERR_INVALID_PURPOSE'
  when OpenSSL::X509::V_ERR_KEYUSAGE_NO_CERTSIGN then 'V_ERR_KEYUSAGE_NO_CERTSIGN'
  when OpenSSL::X509::V_ERR_OUT_OF_MEM then 'V_ERR_OUT_OF_MEM'
  when OpenSSL::X509::V_ERR_PATH_LENGTH_EXCEEDED then 'V_ERR_PATH_LENGTH_EXCEEDED'
  when OpenSSL::X509::V_ERR_SELF_SIGNED_CERT_IN_CHAIN then 'V_ERR_SELF_SIGNED_CERT_IN_CHAIN'
  when OpenSSL::X509::V_ERR_SUBJECT_ISSUER_MISMATCH then 'V_ERR_SUBJECT_ISSUER_MISMATCH'
  when OpenSSL::X509::V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY'
  when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY'
  when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE then 'V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE'
  when OpenSSL::X509::V_ERR_UNABLE_TO_GET_CRL then 'V_ERR_UNABLE_TO_GET_CRL'
  when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT'
  when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY'
  when OpenSSL::X509::V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE then 'V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE'
  end
end

Instance Method Details

#initializeObject



275
276
277
278
# File 'lib/fluent/plugin_helper/socket.rb', line 275

def initialize
  super
  # @_sockets = [] # for keepalived sockets / connection pool
end

#socket_certificates_from_file(path) ⇒ Object



207
208
209
210
211
212
213
214
215
216
# File 'lib/fluent/plugin_helper/socket.rb', line 207

def socket_certificates_from_file(path)
  data = File.read(path)
  pattern = Regexp.compile('-+BEGIN CERTIFICATE-+\r?\n(?:[^-]*\r?\n)+-+END CERTIFICATE-+\r?\n?', Regexp::MULTILINE)
  list = []
  data.scan(pattern) { |match| list << OpenSSL::X509::Certificate.new(match) }
  if list.length == 0
    raise Fluent::ConfigError, "cert_path does not contain a valid certificate"
  end
  list
end

#socket_create(proto, host, port, **kwargs, &block) ⇒ Object

TODO: implement connection pool for specified host



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin_helper/socket.rb', line 41

def socket_create(proto, host, port, **kwargs, &block)
  case proto
  when :tcp
    socket_create_tcp(host, port, **kwargs, &block)
  when :udp
    socket_create_udp(host, port, **kwargs, &block)
  when :tls
    socket_create_tls(host, port, **kwargs, &block)
  when :unix
    raise "not implemented yet"
  else
    raise ArgumentError, "invalid protocol: #{proto}"
  end
end

#socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin_helper/socket.rb', line 56

def socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block)
  sock = if connect_timeout
           s = ::Socket.tcp(host, port, connect_timeout: connect_timeout)
           s.autoclose = false # avoid GC triggered close
           WrappedSocket::TCP.for_fd(s.fileno)
         else
           WrappedSocket::TCP.new(host, port)
         end
  socket_option_set(sock, resolve_name: resolve_name, **kwargs)
  if block
    begin
      block.call(sock)
    ensure
      sock.close_write rescue nil
      sock.close rescue nil
    end
  else
    sock
  end
end

#socket_create_tls(host, port, version: Fluent::TLS::DEFAULT_VERSION, min_version: nil, max_version: nil, ciphers: Fluent::TLS::CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil, enable_system_cert_store: true, allow_self_signed_cert: false, cert_paths: nil, cert_path: nil, private_key_path: nil, private_key_passphrase: nil, cert_thumbprint: nil, cert_logical_store_name: nil, cert_use_enterprise_store: true, connect_timeout: nil, **kwargs, &block) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/fluent/plugin_helper/socket.rb', line 93

def socket_create_tls(
    host, port,
    version: Fluent::TLS::DEFAULT_VERSION, min_version: nil, max_version: nil, ciphers: Fluent::TLS::CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil,
    enable_system_cert_store: true, allow_self_signed_cert: false, cert_paths: nil,
    cert_path: nil, private_key_path: nil, private_key_passphrase: nil,
    cert_thumbprint: nil, cert_logical_store_name: nil, cert_use_enterprise_store: true,
    connect_timeout: nil,
    **kwargs, &block)

  host_is_ipaddress = IPAddr.new(host) rescue false
  fqdn ||= host unless host_is_ipaddress

  context = OpenSSL::SSL::SSLContext.new

  if insecure
    log.trace "setting TLS verify_mode NONE"
    context.verify_mode = OpenSSL::SSL::VERIFY_NONE
  else
    cert_store = OpenSSL::X509::Store.new
    if allow_self_signed_cert && OpenSSL::X509.const_defined?('V_FLAG_CHECK_SS_SIGNATURE')
      cert_store.flags = OpenSSL::X509::V_FLAG_CHECK_SS_SIGNATURE
    end
    begin
      if enable_system_cert_store
        if Fluent.windows? && cert_logical_store_name
          log.trace "loading Windows system certificate store"
          loader = Certstore::OpenSSL::Loader.new(log, cert_store, cert_logical_store_name,
                                                  enterprise: cert_use_enterprise_store)
          loader.load_cert_store
          cert_store = loader.cert_store
          context.cert = loader.get_certificate(cert_thumbprint) if cert_thumbprint
        end
        log.trace "loading system default certificate store"
        cert_store.set_default_paths
      end
    rescue OpenSSL::X509::StoreError
      log.warn "failed to load system default certificate store", error: e
    end
    if cert_paths
      if cert_paths.respond_to?(:each)
        cert_paths.each do |cert_path|
          log.trace "adding CA cert", path: cert_path
          cert_store.add_file(cert_path)
        end
      else
        cert_path = cert_paths
        log.trace "adding CA cert", path: cert_path
        cert_store.add_file(cert_path)
      end
    end

    log.trace "setting TLS context", mode: "peer", ciphers: ciphers
    context.set_params({})
    context.ciphers = ciphers
    context.verify_mode = OpenSSL::SSL::VERIFY_PEER
    context.cert_store = cert_store
    context.verify_hostname = verify_fqdn && fqdn
    context.key = OpenSSL::PKey::read(File.read(private_key_path), private_key_passphrase) if private_key_path

    if cert_path
      certs = socket_certificates_from_file(cert_path)
      context.cert = certs.shift
      unless certs.empty?
        context.extra_chain_cert = certs
      end
    end
  end
  Fluent::TLS.set_version_to_context(context, version, min_version, max_version)

  tcpsock = socket_create_tcp(host, port, connect_timeout: connect_timeout, **kwargs)
  sock = WrappedSocket::TLS.new(tcpsock, context)
  sock.sync_close = true
  sock.hostname = fqdn if verify_fqdn && fqdn && sock.respond_to?(:hostname=)

  log.trace "entering TLS handshake"
  if connect_timeout
    begin
      Timeout.timeout(connect_timeout) { sock.connect }
    rescue Timeout::Error
      log.warn "timeout while connecting tls session", host: host
      sock.close rescue nil
      raise
    end
  else
    sock.connect
  end

  begin
    if verify_fqdn
      log.trace "checking peer's certificate", subject: sock.peer_cert.subject
      sock.post_connection_check(fqdn)
      verify = sock.verify_result
      if verify != OpenSSL::X509::V_OK
        err_name = Socket.tls_verify_result_name(verify)
        log.warn "BUG: failed to verify certification while connecting (but not raised, why?)", host: host, fqdn: fqdn, error: err_name
        raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{host} as #{fqdn}"
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    log.warn "failed to verify certification while connecting tls session", host: host, fqdn: fqdn, error: e
    raise
  end

  if block
    begin
      block.call(sock)
    ensure
      sock.close rescue nil
    end
  else
    sock
  end
end

#socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin_helper/socket.rb', line 77

def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block)
  family = IPAddr.new(IPSocket.getaddress(host)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6
  sock = WrappedSocket::UDP.new(family)
  socket_option_set(sock, resolve_name: resolve_name, **kwargs)
  sock.connect(host, port) if connect
  if block
    begin
      block.call(sock)
    ensure
      sock.close rescue nil
    end
  else
    sock
  end
end