Module: Fluent::PluginHelper::Server

Includes:
CertOption, EventLoop, SocketOption
Defined in:
lib/fluent/plugin_helper/server.rb

Defined Under Namespace

Modules: EventHandler, ServerTransportParams Classes: CallbackSocket, ServerInfo, TCPCallbackSocket, TLSCallbackSocket, UDPCallbackSocket

Constant Summary collapse

PROTOCOLS =
[:tcp, :udp, :tls, :unix]
CONNECTION_PROTOCOLS =
[:tcp, :tls, :unix]
SERVER_TRANSPORT_PARAMS =
[
  :protocol, :version, :min_version, :max_version, :ciphers, :insecure,
  :ca_path, :cert_path, :private_key_path, :private_key_passphrase, :client_cert_auth,
  :ca_cert_path, :ca_private_key_path, :ca_private_key_passphrase,
  :cert_verifier, :generate_private_key_length,
  :generate_cert_country, :generate_cert_state, :generate_cert_state,
  :generate_cert_locality, :generate_cert_common_name,
  :generate_cert_expiration, :generate_cert_digest,
  :ensure_fips,
]
PEERADDR_FAILED =

Use string “?” for port, not integer or nil. “?” is clear than -1 or nil in the log.

["?", "?", "name resolution failed", "?"]

Constants included from SocketOption

Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER_WINDOWS, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_TIMEVAL

Constants included from EventLoop

EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT, EventLoop::EVENT_LOOP_SHUTDOWN_TIMEOUT

Constants included from Thread

Thread::THREAD_DEFAULT_WAIT_SECONDS, Thread::THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS

Instance Attribute Summary collapse

Attributes included from EventLoop

#_event_loop

Attributes included from Thread

#_threads

Class Method Summary collapse

Instance Method Summary collapse

Methods included from CertOption

#cert_option_add_extensions, #cert_option_cert_generation_opts_from_conf, #cert_option_certificates_from_file, #cert_option_create_context, #cert_option_generate_ca_pair_self_signed, #cert_option_generate_pair, #cert_option_generate_server_pair_by_ca, #cert_option_generate_server_pair_self_signed, #cert_option_load, #cert_option_server_validate!

Methods included from SocketOption

#socket_option_set, #socket_option_set_one, #socket_option_validate!

Methods included from EventLoop

#after_shutdown, #close, #event_loop_attach, #event_loop_detach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop, #start

Methods included from Thread

#after_shutdown, #close, #thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop

Instance Attribute Details

#_serversObject (readonly)

stop : [-] shutdown : detach server event handler from event loop (event_loop) close : close listening sockets terminate: remote all server instances



46
47
48
# File 'lib/fluent/plugin_helper/server.rb', line 46

def _servers
  @_servers
end

Class Method Details

.included(mod) ⇒ Object



325
326
327
# File 'lib/fluent/plugin_helper/server.rb', line 325

def self.included(mod)
  mod.include ServerTransportParams
end

Instance Method Details

#configure(conf) ⇒ Object



336
337
338
339
340
341
342
343
344
# File 'lib/fluent/plugin_helper/server.rb', line 336

def configure(conf)
  super

  if @transport_config
    if @transport_config.protocol == :tls
      cert_option_server_validate!(@transport_config)
    end
  end
end

#initializeObject



329
330
331
332
333
334
# File 'lib/fluent/plugin_helper/server.rb', line 329

def initialize
  super
  @_servers = []
  @_server_connections = []
  @_server_mutex = Mutex.new
end

#server_attach(title, proto, port, bind, shared, server) ⇒ Object



210
211
212
213
# File 'lib/fluent/plugin_helper/server.rb', line 210

def server_attach(title, proto, port, bind, shared, server)
  @_servers << ServerInfo.new(title, proto, port, bind, shared, server)
  event_loop_attach(server)
end

#server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) ⇒ Object

server_create(:title, @port) do |data|

# ...

end server_create(:title, @port) do |data, conn|

# ...

end server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock|

sock.remote_host
sock.remote_port
# ...

end

Raises:

  • (ArgumentError)


124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/fluent/plugin_helper/server.rb', line 124

def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp
  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
  raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2

  if proto == :tcp || proto == :tls
    socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
  end

  socket_options[:receive_buffer_size] ||= @transport_config&.receive_buffer_size

  unless socket
    socket_option_validate!(proto, **socket_options)
    socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
  end

  if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections
    raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog
  end
  if proto != :udp # UDP options
    raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes
    raise ArgumentError, "BUG: flags is available only for udp" if flags != 0
  end

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :udp
    raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
    if socket
      sock = socket
      close_socket = false
    else
      sock = server_create_udp_socket(shared, bind, port)
      socket_option_setter.call(sock)
      close_socket = true
    end
    server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback)
  when :unix
    raise "not implemented yet"
  else
    raise "BUG: unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end

#server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block) ⇒ Object

server_create_connection(:title, @port) do |conn|

# on connection
source_addr = conn.remote_host
source_port = conn.remote_port
conn.data do |data|
  # on data
  conn.write resp # ...
  conn.close
end

end

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin_helper/server.rb', line 70

def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
  raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles connection" unless block_given?
  raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1

  if proto == :tcp || proto == :tls
    socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
  end

  socket_options[:receive_buffer_size] ||= @transport_config&.receive_buffer_size

  socket_option_validate!(proto, **socket_options)
  socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block)
  when :unix
    raise "not implemented yet"
  else
    raise "unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end

#server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/fluent/plugin_helper/server.rb', line 215

def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    unless conn.closing
      @_server_mutex.synchronize do
        @_server_connections << conn
      end
    end
  end
  server.listen(backlog) if backlog
  server
end

#server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/fluent/plugin_helper/server.rb', line 230

def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
  context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    unless conn.closing
      @_server_mutex.synchronize do
        @_server_connections << conn
      end
    end
  end
  server.listen(backlog) if backlog
  server
end

#server_create_tcp(title, port, **kwargs, &callback) ⇒ Object



192
193
194
# File 'lib/fluent/plugin_helper/server.rb', line 192

def server_create_tcp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tcp, **kwargs, &callback)
end

#server_create_tcp_socket(shared, bind, port) ⇒ Object



382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/fluent/plugin_helper/server.rb', line 382

def server_create_tcp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_tcp(bind, port)
         else
           # TCPServer.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
           # backlog will be set by the caller, we don't need to set backlog here
           tsock = Addrinfo.tcp(bind, port).listen
           tsock.autoclose = false
           TCPServer.for_fd(tsock.fileno)
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end

#server_create_tls(title, port, **kwargs, &callback) ⇒ Object



200
201
202
# File 'lib/fluent/plugin_helper/server.rb', line 200

def server_create_tls(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tls, **kwargs, &callback)
end

#server_create_transport_section_object(opts) ⇒ Object



257
258
259
260
261
262
263
264
265
# File 'lib/fluent/plugin_helper/server.rb', line 257

def server_create_transport_section_object(opts)
  transport_section = configured_section_create(:transport)
  SERVER_TRANSPORT_PARAMS.each do |param|
    if opts.has_key?(param)
      transport_section[param] = opts[param]
    end
  end
  transport_section
end

#server_create_udp(title, port, **kwargs, &callback) ⇒ Object



196
197
198
# File 'lib/fluent/plugin_helper/server.rb', line 196

def server_create_udp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :udp, **kwargs, &callback)
end

#server_create_udp_socket(shared, bind, port) ⇒ Object



397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/fluent/plugin_helper/server.rb', line 397

def server_create_udp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_udp(bind, port)
         else
           # UDPSocket.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
           usock = Addrinfo.udp(bind, port).bind
           usock.autoclose = false
           UDPSocket.for_fd(usock.fileno)
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end

#server_create_unix(title, port, **kwargs, &callback) ⇒ Object



204
205
206
# File 'lib/fluent/plugin_helper/server.rb', line 204

def server_create_unix(title, port, **kwargs, &callback)
  server_create(title, port, proto: :unix, **kwargs, &callback)
end

#server_socket_manager_clientObject



374
375
376
377
378
379
380
# File 'lib/fluent/plugin_helper/server.rb', line 374

def server_socket_manager_client
  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  ServerEngine::SocketManager::Client.new(socket_manager_path)
end

#server_wait_until_startObject



48
49
50
# File 'lib/fluent/plugin_helper/server.rb', line 48

def server_wait_until_start
  # event_loop_wait_until_start works well for this
end

#server_wait_until_stopObject



52
53
54
55
# File 'lib/fluent/plugin_helper/server.rb', line 52

def server_wait_until_stop
  sleep 0.1 while @_servers.any?{|si| si.server.attached? }
  @_servers.each{|si| si.server.close rescue nil }
end

#shutdownObject



358
359
360
361
362
363
364
365
366
367
# File 'lib/fluent/plugin_helper/server.rb', line 358

def shutdown
  # When it invokes conn.cose, it reduces elements in @_server_connections by close_callback,
  # and it reduces the number of loops. This prevents the connection closing.
  # So, it requires invoking #dup to avoid the problem.
  @_server_connections.dup.each do |conn|
    conn.close rescue nil
  end

  super
end

#stopObject



346
347
348
349
350
351
352
353
354
355
356
# File 'lib/fluent/plugin_helper/server.rb', line 346

def stop
  @_server_mutex.synchronize do
    @_servers.each do |si|
      si.server.detach if si.server.attached?
      # to refuse more connections: (connected sockets are still alive here)
      si.server.close rescue nil
    end
  end

  super
end

#terminateObject



369
370
371
372
# File 'lib/fluent/plugin_helper/server.rb', line 369

def terminate
  @_servers = []
  super
end