Module: Fluent::PluginHelper::Socket
- Includes:
- SocketOption
- Defined in:
- lib/fluent/plugin_helper/socket.rb
Defined Under Namespace
Modules: WrappedSocket
Constant Summary
Constants included from SocketOption
Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER_WINDOWS, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_TIMEVAL
Instance Attribute Summary collapse
-
#_sockets ⇒ Object
readonly
for tests.
Class Method Summary collapse
Instance Method Summary collapse
- #initialize ⇒ Object
- #socket_certificates_from_file(path) ⇒ Object
-
#socket_create(proto, host, port, **kwargs, &block) ⇒ Object
TODO: implement connection pool for specified host.
- #socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block) ⇒ Object
- #socket_create_tls(host, port, version: Fluent::TLS::DEFAULT_VERSION, min_version: nil, max_version: nil, ciphers: Fluent::TLS::CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil, enable_system_cert_store: true, allow_self_signed_cert: false, cert_paths: nil, cert_path: nil, private_key_path: nil, private_key_passphrase: nil, cert_thumbprint: nil, cert_logical_store_name: nil, cert_use_enterprise_store: true, connect_timeout: nil, **kwargs, &block) ⇒ Object
- #socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block) ⇒ Object
Methods included from SocketOption
#socket_option_set, #socket_option_set_one, #socket_option_validate!
Instance Attribute Details
#_sockets ⇒ Object (readonly)
for tests
37 38 39 |
# File 'lib/fluent/plugin_helper/socket.rb', line 37 def _sockets @_sockets end |
Class Method Details
.tls_verify_result_name(code) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/fluent/plugin_helper/socket.rb', line 218 def self.tls_verify_result_name(code) case code when OpenSSL::X509::V_OK then 'V_OK' when OpenSSL::X509::V_ERR_AKID_SKID_MISMATCH then 'V_ERR_AKID_SKID_MISMATCH' when OpenSSL::X509::V_ERR_APPLICATION_VERIFICATION then 'V_ERR_APPLICATION_VERIFICATION' when OpenSSL::X509::V_ERR_CERT_CHAIN_TOO_LONG then 'V_ERR_CERT_CHAIN_TOO_LONG' when OpenSSL::X509::V_ERR_CERT_HAS_EXPIRED then 'V_ERR_CERT_HAS_EXPIRED' when OpenSSL::X509::V_ERR_CERT_NOT_YET_VALID then 'V_ERR_CERT_NOT_YET_VALID' when OpenSSL::X509::V_ERR_CERT_REJECTED then 'V_ERR_CERT_REJECTED' when OpenSSL::X509::V_ERR_CERT_REVOKED then 'V_ERR_CERT_REVOKED' when OpenSSL::X509::V_ERR_CERT_SIGNATURE_FAILURE then 'V_ERR_CERT_SIGNATURE_FAILURE' when OpenSSL::X509::V_ERR_CERT_UNTRUSTED then 'V_ERR_CERT_UNTRUSTED' when OpenSSL::X509::V_ERR_CRL_HAS_EXPIRED then 'V_ERR_CRL_HAS_EXPIRED' when OpenSSL::X509::V_ERR_CRL_NOT_YET_VALID then 'V_ERR_CRL_NOT_YET_VALID' when OpenSSL::X509::V_ERR_CRL_SIGNATURE_FAILURE then 'V_ERR_CRL_SIGNATURE_FAILURE' when OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT then 'V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT' when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD' when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD' when OpenSSL::X509::V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD' when OpenSSL::X509::V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD' when OpenSSL::X509::V_ERR_INVALID_CA then 'V_ERR_INVALID_CA' when OpenSSL::X509::V_ERR_INVALID_PURPOSE then 'V_ERR_INVALID_PURPOSE' when OpenSSL::X509::V_ERR_KEYUSAGE_NO_CERTSIGN then 'V_ERR_KEYUSAGE_NO_CERTSIGN' when OpenSSL::X509::V_ERR_OUT_OF_MEM then 'V_ERR_OUT_OF_MEM' when OpenSSL::X509::V_ERR_PATH_LENGTH_EXCEEDED then 'V_ERR_PATH_LENGTH_EXCEEDED' when OpenSSL::X509::V_ERR_SELF_SIGNED_CERT_IN_CHAIN then 'V_ERR_SELF_SIGNED_CERT_IN_CHAIN' when OpenSSL::X509::V_ERR_SUBJECT_ISSUER_MISMATCH then 'V_ERR_SUBJECT_ISSUER_MISMATCH' when OpenSSL::X509::V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY' when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY' when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE then 'V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE' when OpenSSL::X509::V_ERR_UNABLE_TO_GET_CRL then 'V_ERR_UNABLE_TO_GET_CRL' when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT' when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY' when OpenSSL::X509::V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE then 'V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE' end end |
Instance Method Details
#initialize ⇒ Object
275 276 277 278 |
# File 'lib/fluent/plugin_helper/socket.rb', line 275 def initialize super # @_sockets = [] # for keepalived sockets / connection pool end |
#socket_certificates_from_file(path) ⇒ Object
207 208 209 210 211 212 213 214 215 216 |
# File 'lib/fluent/plugin_helper/socket.rb', line 207 def socket_certificates_from_file(path) data = File.read(path) pattern = Regexp.compile('-+BEGIN CERTIFICATE-+\r?\n(?:[^-]*\r?\n)+-+END CERTIFICATE-+\r?\n?', Regexp::MULTILINE) list = [] data.scan(pattern) { |match| list << OpenSSL::X509::Certificate.new(match) } if list.length == 0 raise Fluent::ConfigError, "cert_path does not contain a valid certificate" end list end |
#socket_create(proto, host, port, **kwargs, &block) ⇒ Object
TODO: implement connection pool for specified host
41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin_helper/socket.rb', line 41 def socket_create(proto, host, port, **kwargs, &block) case proto when :tcp socket_create_tcp(host, port, **kwargs, &block) when :udp socket_create_udp(host, port, **kwargs, &block) when :tls socket_create_tls(host, port, **kwargs, &block) when :unix raise "not implemented yet" else raise ArgumentError, "invalid protocol: #{proto}" end end |
#socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin_helper/socket.rb', line 56 def socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block) sock = if connect_timeout s = ::Socket.tcp(host, port, connect_timeout: connect_timeout) s.autoclose = false # avoid GC triggered close WrappedSocket::TCP.for_fd(s.fileno) else WrappedSocket::TCP.new(host, port) end socket_option_set(sock, resolve_name: resolve_name, **kwargs) if block begin block.call(sock) ensure sock.close_write rescue nil sock.close rescue nil end else sock end end |
#socket_create_tls(host, port, version: Fluent::TLS::DEFAULT_VERSION, min_version: nil, max_version: nil, ciphers: Fluent::TLS::CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil, enable_system_cert_store: true, allow_self_signed_cert: false, cert_paths: nil, cert_path: nil, private_key_path: nil, private_key_passphrase: nil, cert_thumbprint: nil, cert_logical_store_name: nil, cert_use_enterprise_store: true, connect_timeout: nil, **kwargs, &block) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/fluent/plugin_helper/socket.rb', line 93 def socket_create_tls( host, port, version: Fluent::TLS::DEFAULT_VERSION, min_version: nil, max_version: nil, ciphers: Fluent::TLS::CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil, enable_system_cert_store: true, allow_self_signed_cert: false, cert_paths: nil, cert_path: nil, private_key_path: nil, private_key_passphrase: nil, cert_thumbprint: nil, cert_logical_store_name: nil, cert_use_enterprise_store: true, connect_timeout: nil, **kwargs, &block) host_is_ipaddress = IPAddr.new(host) rescue false fqdn ||= host unless host_is_ipaddress context = OpenSSL::SSL::SSLContext.new if insecure log.trace "setting TLS verify_mode NONE" context.verify_mode = OpenSSL::SSL::VERIFY_NONE else cert_store = OpenSSL::X509::Store.new if allow_self_signed_cert && OpenSSL::X509.const_defined?('V_FLAG_CHECK_SS_SIGNATURE') cert_store.flags = OpenSSL::X509::V_FLAG_CHECK_SS_SIGNATURE end begin if enable_system_cert_store if Fluent.windows? && cert_logical_store_name log.trace "loading Windows system certificate store" loader = Certstore::OpenSSL::Loader.new(log, cert_store, cert_logical_store_name, enterprise: cert_use_enterprise_store) loader.load_cert_store cert_store = loader.cert_store context.cert = loader.get_certificate(cert_thumbprint) if cert_thumbprint end log.trace "loading system default certificate store" cert_store.set_default_paths end rescue OpenSSL::X509::StoreError log.warn "failed to load system default certificate store", error: e end if cert_paths if cert_paths.respond_to?(:each) cert_paths.each do |cert_path| log.trace "adding CA cert", path: cert_path cert_store.add_file(cert_path) end else cert_path = cert_paths log.trace "adding CA cert", path: cert_path cert_store.add_file(cert_path) end end log.trace "setting TLS context", mode: "peer", ciphers: ciphers context.set_params({}) context.ciphers = ciphers context.verify_mode = OpenSSL::SSL::VERIFY_PEER context.cert_store = cert_store context.verify_hostname = verify_fqdn && fqdn context.key = OpenSSL::PKey::read(File.read(private_key_path), private_key_passphrase) if private_key_path if cert_path certs = socket_certificates_from_file(cert_path) context.cert = certs.shift unless certs.empty? context.extra_chain_cert = certs end end end Fluent::TLS.set_version_to_context(context, version, min_version, max_version) tcpsock = socket_create_tcp(host, port, connect_timeout: connect_timeout, **kwargs) sock = WrappedSocket::TLS.new(tcpsock, context) sock.sync_close = true sock.hostname = fqdn if verify_fqdn && fqdn && sock.respond_to?(:hostname=) log.trace "entering TLS handshake" if connect_timeout begin Timeout.timeout(connect_timeout) { sock.connect } rescue Timeout::Error log.warn "timeout while connecting tls session", host: host sock.close rescue nil raise end else sock.connect end begin if verify_fqdn log.trace "checking peer's certificate", subject: sock.peer_cert.subject sock.post_connection_check(fqdn) verify = sock.verify_result if verify != OpenSSL::X509::V_OK err_name = Socket.tls_verify_result_name(verify) log.warn "BUG: failed to verify certification while connecting (but not raised, why?)", host: host, fqdn: fqdn, error: err_name raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{host} as #{fqdn}" end end rescue OpenSSL::SSL::SSLError => e log.warn "failed to verify certification while connecting tls session", host: host, fqdn: fqdn, error: e raise end if block begin block.call(sock) ensure sock.close rescue nil end else sock end end |
#socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/fluent/plugin_helper/socket.rb', line 77 def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block) family = IPAddr.new(IPSocket.getaddress(host)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 sock = WrappedSocket::UDP.new(family) socket_option_set(sock, resolve_name: resolve_name, **kwargs) sock.connect(host, port) if connect if block begin block.call(sock) ensure sock.close rescue nil end else sock end end |