Module: Fluent::PluginHelper::Server

Includes:
CertOption, EventLoop, SocketOption
Defined in:
lib/fluent/plugin_helper/server.rb

Defined Under Namespace

Modules: EventHandler, ServerTransportParams Classes: CallbackSocket, ServerInfo, TCPCallbackSocket, TLSCallbackSocket, UDPCallbackSocket

Constant Summary collapse

PROTOCOLS =
[:tcp, :udp, :tls, :unix]
CONNECTION_PROTOCOLS =
[:tcp, :tls, :unix]
SERVER_TRANSPORT_PARAMS =
[
  :protocol, :version, :min_version, :max_version, :ciphers, :insecure,
  :ca_path, :cert_path, :private_key_path, :private_key_passphrase, :client_cert_auth,
  :ca_cert_path, :ca_private_key_path, :ca_private_key_passphrase,
  :cert_verifier, :generate_private_key_length,
  :generate_cert_country, :generate_cert_state, :generate_cert_state,
  :generate_cert_locality, :generate_cert_common_name,
  :generate_cert_expiration, :generate_cert_digest,
]
PEERADDR_FAILED =

Use string “?” for port, not integer or nil. “?” is clear than -1 or nil in the log.

["?", "?", "name resolution failed", "?"]

Constants included from SocketOption

Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER_WINDOWS, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_TIMEVAL

Constants included from EventLoop

EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT, EventLoop::EVENT_LOOP_SHUTDOWN_TIMEOUT

Constants included from Thread

Thread::THREAD_DEFAULT_WAIT_SECONDS, Thread::THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS

Instance Attribute Summary collapse

Attributes included from EventLoop

#_event_loop

Attributes included from Thread

#_threads

Class Method Summary collapse

Instance Method Summary collapse

Methods included from CertOption

#cert_option_add_extensions, #cert_option_cert_generation_opts_from_conf, #cert_option_certificates_from_file, #cert_option_create_context, #cert_option_generate_ca_pair_self_signed, #cert_option_generate_pair, #cert_option_generate_server_pair_by_ca, #cert_option_generate_server_pair_self_signed, #cert_option_load, #cert_option_server_validate!

Methods included from SocketOption

#socket_option_set, #socket_option_set_one, #socket_option_validate!

Methods included from EventLoop

#after_shutdown, #close, #event_loop_attach, #event_loop_detach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop, #start

Methods included from Thread

#after_shutdown, #close, #thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop

Instance Attribute Details

#_serversObject (readonly)

stop : [-] shutdown : detach server event handler from event loop (event_loop) close : close listening sockets terminate: remote all server instances



46
47
48
# File 'lib/fluent/plugin_helper/server.rb', line 46

def _servers
  @_servers
end

Class Method Details

.included(mod) ⇒ Object



316
317
318
# File 'lib/fluent/plugin_helper/server.rb', line 316

def self.included(mod)
  mod.include ServerTransportParams
end

Instance Method Details

#configure(conf) ⇒ Object



327
328
329
330
331
332
333
334
335
# File 'lib/fluent/plugin_helper/server.rb', line 327

def configure(conf)
  super

  if @transport_config
    if @transport_config.protocol == :tls
      cert_option_server_validate!(@transport_config)
    end
  end
end

#initializeObject



320
321
322
323
324
325
# File 'lib/fluent/plugin_helper/server.rb', line 320

def initialize
  super
  @_servers = []
  @_server_connections = []
  @_server_mutex = Mutex.new
end

#server_attach(title, proto, port, bind, shared, server) ⇒ Object



206
207
208
209
# File 'lib/fluent/plugin_helper/server.rb', line 206

def server_attach(title, proto, port, bind, shared, server)
  @_servers << ServerInfo.new(title, proto, port, bind, shared, server)
  event_loop_attach(server)
end

#server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) ⇒ Object

server_create(:title, @port) do |data|

# ...

end server_create(:title, @port) do |data, conn|

# ...

end server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock|

sock.remote_host
sock.remote_port
# ...

end

Raises:

  • (ArgumentError)


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/fluent/plugin_helper/server.rb', line 122

def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp
  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
  raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2

  if proto == :tcp || proto == :tls
    socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
  end

  unless socket
    socket_option_validate!(proto, **socket_options)
    socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
  end

  if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections
    raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog
  end
  if proto != :udp # UDP options
    raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes
    raise ArgumentError, "BUG: flags is available only for udp" if flags != 0
  end

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :udp
    raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
    if socket
      sock = socket
      close_socket = false
    else
      sock = server_create_udp_socket(shared, bind, port)
      socket_option_setter.call(sock)
      close_socket = true
    end
    server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback)
  when :unix
    raise "not implemented yet"
  else
    raise "BUG: unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end

#server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block) ⇒ Object

server_create_connection(:title, @port) do |conn|

# on connection
source_addr = conn.remote_host
source_port = conn.remote_port
conn.data do |data|
  # on data
  conn.write resp # ...
  conn.close
end

end

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/plugin_helper/server.rb', line 70

def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
  raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles connection" unless block_given?
  raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1

  if proto == :tcp || proto == :tls
    socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
  end

  socket_option_validate!(proto, **socket_options)
  socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block)
  when :unix
    raise "not implemented yet"
  else
    raise "unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end

#server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/fluent/plugin_helper/server.rb', line 211

def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    unless conn.closing
      @_server_mutex.synchronize do
        @_server_connections << conn
      end
    end
  end
  server.listen(backlog) if backlog
  server
end

#server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/fluent/plugin_helper/server.rb', line 226

def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
  context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    unless conn.closing
      @_server_mutex.synchronize do
        @_server_connections << conn
      end
    end
  end
  server.listen(backlog) if backlog
  server
end

#server_create_tcp(title, port, **kwargs, &callback) ⇒ Object



188
189
190
# File 'lib/fluent/plugin_helper/server.rb', line 188

def server_create_tcp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tcp, **kwargs, &callback)
end

#server_create_tcp_socket(shared, bind, port) ⇒ Object



370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/fluent/plugin_helper/server.rb', line 370

def server_create_tcp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_tcp(bind, port)
         else
           # TCPServer.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
           # backlog will be set by the caller, we don't need to set backlog here
           tsock = Addrinfo.tcp(bind, port).listen
           tsock.autoclose = false
           TCPServer.for_fd(tsock.fileno)
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end

#server_create_tls(title, port, **kwargs, &callback) ⇒ Object



196
197
198
# File 'lib/fluent/plugin_helper/server.rb', line 196

def server_create_tls(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tls, **kwargs, &callback)
end

#server_create_transport_section_object(opts) ⇒ Object



252
253
254
255
256
257
258
259
260
# File 'lib/fluent/plugin_helper/server.rb', line 252

def server_create_transport_section_object(opts)
  transport_section = configured_section_create(:transport)
  SERVER_TRANSPORT_PARAMS.each do |param|
    if opts.has_key?(param)
      transport_section[param] = opts[param]
    end
  end
  transport_section
end

#server_create_udp(title, port, **kwargs, &callback) ⇒ Object



192
193
194
# File 'lib/fluent/plugin_helper/server.rb', line 192

def server_create_udp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :udp, **kwargs, &callback)
end

#server_create_udp_socket(shared, bind, port) ⇒ Object



385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/fluent/plugin_helper/server.rb', line 385

def server_create_udp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_udp(bind, port)
         else
           # UDPSocket.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
           usock = Addrinfo.udp(bind, port).bind
           usock.autoclose = false
           UDPSocket.for_fd(usock.fileno)
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end

#server_create_unix(title, port, **kwargs, &callback) ⇒ Object



200
201
202
# File 'lib/fluent/plugin_helper/server.rb', line 200

def server_create_unix(title, port, **kwargs, &callback)
  server_create(title, port, proto: :unix, **kwargs, &callback)
end

#server_socket_manager_clientObject



362
363
364
365
366
367
368
# File 'lib/fluent/plugin_helper/server.rb', line 362

def server_socket_manager_client
  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  ServerEngine::SocketManager::Client.new(socket_manager_path)
end

#server_wait_until_startObject



48
49
50
# File 'lib/fluent/plugin_helper/server.rb', line 48

def server_wait_until_start
  # event_loop_wait_until_start works well for this
end

#server_wait_until_stopObject



52
53
54
55
# File 'lib/fluent/plugin_helper/server.rb', line 52

def server_wait_until_stop
  sleep 0.1 while @_servers.any?{|si| si.server.attached? }
  @_servers.each{|si| si.server.close rescue nil }
end

#shutdownObject



349
350
351
352
353
354
355
# File 'lib/fluent/plugin_helper/server.rb', line 349

def shutdown
  @_server_connections.each do |conn|
    conn.close rescue nil
  end

  super
end

#stopObject



337
338
339
340
341
342
343
344
345
346
347
# File 'lib/fluent/plugin_helper/server.rb', line 337

def stop
  @_server_mutex.synchronize do
    @_servers.each do |si|
      si.server.detach if si.server.attached?
      # to refuse more connections: (connected sockets are still alive here)
      si.server.close rescue nil
    end
  end

  super
end

#terminateObject



357
358
359
360
# File 'lib/fluent/plugin_helper/server.rb', line 357

def terminate
  @_servers = []
  super
end