Module: Fluent::ServerModule

Defined in:
lib/fluent/supervisor.rb

Instance Method Summary collapse

Instance Method Details

#after_runObject



74
75
76
77
78
79
80
# File 'lib/fluent/supervisor.rb', line 74

def after_run
  stop_windows_event_thread if Fluent.windows?
  stop_rpc_server if @rpc_endpoint
  stop_counter_server if @counter
  cleanup_lock_dir
  Fluent::Supervisor.cleanup_resources
end

#before_runObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/supervisor.rb', line 41

def before_run
  @fluentd_conf = config[:fluentd_conf]
  @rpc_endpoint = nil
  @rpc_server = nil
  @counter = nil

  @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
  ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir

  if config[:rpc_endpoint]
    @rpc_endpoint = config[:rpc_endpoint]
    @enable_get_dump = config[:enable_get_dump]
    run_rpc_server
  end

  if Fluent.windows?
    install_windows_event_handler
  else
    install_supervisor_signal_handlers
  end

  if counter = config[:counter_server]
    run_counter_server(counter)
  end

  if config[:disable_shared_socket]
    $log.info "shared socket for multiple workers is disabled"
  else
    server = ServerEngine::SocketManager::Server.open
    ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
  end
end

#cleanup_lock_dirObject



82
83
84
85
# File 'lib/fluent/supervisor.rb', line 82

def cleanup_lock_dir
  FileUtils.rm(Dir.glob(File.join(@fluentd_lock_dir, "fluentd-*.lock")))
  FileUtils.rmdir(@fluentd_lock_dir)
end

#dumpObject



357
358
359
# File 'lib/fluent/supervisor.rb', line 357

def dump
  super unless @stop
end

#install_supervisor_signal_handlersObject



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/fluent/supervisor.rb', line 175

def install_supervisor_signal_handlers
  return if Fluent.windows?

  trap :HUP do
    $log.debug "fluentd supervisor process get SIGHUP"
    supervisor_sighup_handler
  end

  trap :USR1 do
    $log.debug "fluentd supervisor process get SIGUSR1"
    supervisor_sigusr1_handler
  end

  trap :USR2 do
    $log.debug 'fluentd supervisor process got SIGUSR2'
    supervisor_sigusr2_handler
  end
end

#install_windows_event_handlerObject



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/fluent/supervisor.rb', line 212

def install_windows_event_handler
  return unless Fluent.windows?

  @pid_signame = "fluentd_#{Process.pid}"
  @signame = config[:signame]

  Thread.new do
    ipc = Win32::Ipc.new(nil)
    events = [
      {win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread},
      {win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop},
      {win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup},
      {win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1},
      {win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2},
      {win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont},
    ]
    if @signame
      signame_events = [
        {win32_event: Win32::Event.new("#{@signame}"), action: :stop},
        {win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup},
        {win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1},
        {win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2},
        {win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont},
      ]
      events.concat(signame_events)
    end
    begin
      loop do
        infinite = 0xFFFFFFFF
        ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, infinite)
        event_idx = ipc_idx - 1

        if event_idx >= 0 && event_idx < events.length
          $log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"")
        else
          $log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}")
        end
        case events[event_idx][:action]
        when :stop
          stop(true)
        when :hup
          supervisor_sighup_handler
        when :usr1
          supervisor_sigusr1_handler
        when :usr2
          supervisor_sigusr2_handler
        when :cont
          supervisor_dump_handler_for_windows
        when :stop_event_thread
          break
        end
      end
    ensure
      events.each { |event| event[:win32_event].close }
    end
  end
end

#kill_workerObject



335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/fluent/supervisor.rb', line 335

def kill_worker
  if config[:worker_pid]
    pids = config[:worker_pid].clone
    config[:worker_pid].clear
    pids.each_value do |pid|
      if Fluent.windows?
        Process.kill :KILL, pid
      else
        Process.kill :TERM, pid
      end
    end
  end
end

#reloadObject



205
206
207
208
209
# File 'lib/fluent/supervisor.rb', line 205

def reload
  @monitors.each do |m|
    m.send_command("RELOAD\n")
  end
end

#restart(graceful) ⇒ Object

Override some methods of ServerEngine::MultiSpawnWorker Since Fluentd’s Supervisor doesn’t use ServerEngine’s HUP, USR1 and USR2 handlers (see install_supervisor_signal_handlers), they should be disabled also on Windows, just send commands to workers instead.



199
200
201
202
203
# File 'lib/fluent/supervisor.rb', line 199

def restart(graceful)
  @monitors.each do |m|
    m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n")
  end
end

#run_counter_server(counter_conf) ⇒ Object



163
164
165
166
167
168
169
# File 'lib/fluent/supervisor.rb', line 163

def run_counter_server(counter_conf)
  @counter = Fluent::Counter::Server.new(
    counter_conf.scope,
    {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path}
  )
  @counter.start
end

#run_rpc_serverObject



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/fluent/supervisor.rb', line 87

def run_rpc_server
  @rpc_server = RPC::Server.new(@rpc_endpoint, $log)

  # built-in RPC for signals
  @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
    Process.kill :INT, Process.pid
    nil
  }
  @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.killWorkers request"
    Process.kill :TERM, Process.pid
    nil
  }
  @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request"
    if Fluent.windows?
      supervisor_sigusr1_handler
      stop(true)
    else
      Process.kill :USR1, Process.pid
      Process.kill :TERM, Process.pid
    end
    nil
  }
  @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
    $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
    if Fluent.windows?
      supervisor_sigusr1_handler
    else
      Process.kill :USR1, Process.pid
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.reload') { |req, res|
    $log.debug "fluentd RPC got /api/config.reload request"
    if Fluent.windows?
      # restart worker with auto restarting by killing
      kill_worker
    else
      Process.kill :HUP, Process.pid
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.dump') { |req, res|
    $log.debug "fluentd RPC got /api/config.dump request"
    $log.info "dump in-memory config"
    supervisor_dump_config_handler
    nil
  }

  @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
    $log.debug "fluentd RPC got /api/config.gracefulReload request"
    if Fluent.windows?
      supervisor_sigusr2_handler
    else
      Process.kill :USR2, Process.pid
    end

    nil
  }

  @rpc_server.mount_proc('/api/config.getDump') { |req, res|
    $log.debug "fluentd RPC got /api/config.getDump request"
    $log.info "get dump in-memory config via HTTP"
    res.body = supervisor_get_dump_config_handler
    [nil, nil, res]
  } if @enable_get_dump

  @rpc_server.start
end

#stop_counter_serverObject



171
172
173
# File 'lib/fluent/supervisor.rb', line 171

def stop_counter_server
  @counter.stop
end

#stop_rpc_serverObject



159
160
161
# File 'lib/fluent/supervisor.rb', line 159

def stop_rpc_server
  @rpc_server.shutdown
end

#stop_windows_event_threadObject



270
271
272
273
274
275
276
# File 'lib/fluent/supervisor.rb', line 270

def stop_windows_event_thread
  if Fluent.windows?
    ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD")
    ev.set
    ev.close
  end
end

#supervisor_dump_config_handlerObject



349
350
351
# File 'lib/fluent/supervisor.rb', line 349

def supervisor_dump_config_handler
  $log.info @fluentd_conf
end

#supervisor_dump_handler_for_windowsObject



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/fluent/supervisor.rb', line 315

def supervisor_dump_handler_for_windows
  # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
  # and it is implemented before the implementation of the function for Windows.
  # It is possible to trap SIGCONT and handle it here also on UNIX-like,
  # but for backward compatibility, this handler is currently for a Windows-only.
  raise "[BUG] This function is for Windows ONLY." unless Fluent.windows?

  Thread.new do
    begin
      FluentSigdump.dump_windows
    rescue => e
      $log.error "failed to dump: #{e}"
    end
  end

  send_signal_to_workers(:CONT)
rescue => e
  $log.error "failed to dump: #{e}"
end

#supervisor_get_dump_config_handlerObject



353
354
355
# File 'lib/fluent/supervisor.rb', line 353

def supervisor_get_dump_config_handler
  { conf: @fluentd_conf }
end

#supervisor_sighup_handlerObject



278
279
280
# File 'lib/fluent/supervisor.rb', line 278

def supervisor_sighup_handler
  kill_worker
end

#supervisor_sigusr1_handlerObject



282
283
284
285
# File 'lib/fluent/supervisor.rb', line 282

def supervisor_sigusr1_handler
  reopen_log
  send_signal_to_workers(:USR1)
end

#supervisor_sigusr2_handlerObject



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/fluent/supervisor.rb', line 287

def supervisor_sigusr2_handler
  conf = nil
  t = Thread.new do
    $log.info 'Reloading new config'

    # Validate that loading config is valid at first
    conf = Fluent::Config.build(
      config_path: config[:config_path],
      encoding: config[:conf_encoding],
      additional_config: config[:inline_config],
      use_v1_config: config[:use_v1_config],
    )

    Fluent::VariableStore.try_to_reset do
      Fluent::Engine.reload_config(conf, supervisor: true)
    end
  end

  t.report_on_exception = false # Error is handled by myself
  t.join

  reopen_log
  send_signal_to_workers(:USR2)
  @fluentd_conf = conf.to_s
rescue => e
  $log.error "Failed to reload config file: #{e}"
end