Module: Fluent::ServerModule

Defined in:
lib/fluent/supervisor.rb

Instance Method Summary collapse

Instance Method Details

#after_runObject



77
78
79
80
81
82
# File 'lib/fluent/supervisor.rb', line 77

def after_run
  stop_windows_event_thread if Fluent.windows?
  stop_rpc_server if @rpc_endpoint
  stop_counter_server if @counter
  Fluent::Supervisor.cleanup_resources
end

#before_runObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/supervisor.rb', line 46

def before_run
  @fluentd_conf = config[:fluentd_conf]
  @rpc_endpoint = nil
  @rpc_server = nil
  @counter = nil

  if config[:rpc_endpoint]
    @rpc_endpoint = config[:rpc_endpoint]
    @enable_get_dump = config[:enable_get_dump]
    run_rpc_server
  end

  if Fluent.windows?
    install_windows_event_handler
  else
    install_supervisor_signal_handlers
  end

  if counter = config[:counter_server]
    run_counter_server(counter)
  end

  if config[:disable_shared_socket]
    $log.info "shared socket for multiple workers is disabled"
  else
    socket_manager_path = ServerEngine::SocketManager::Server.generate_path
    ServerEngine::SocketManager::Server.open(socket_manager_path)
    ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
  end
end

#install_supervisor_signal_handlersObject



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/fluent/supervisor.rb', line 172

def install_supervisor_signal_handlers
  return if Fluent.windows?

  trap :HUP do
    $log.debug "fluentd supervisor process get SIGHUP"
    supervisor_sighup_handler
  end

  trap :USR1 do
    $log.debug "fluentd supervisor process get SIGUSR1"
    supervisor_sigusr1_handler
  end

  trap :USR2 do
    $log.debug 'fluentd supervisor process got SIGUSR2'
    supervisor_sigusr2_handler
  end
end

#install_windows_event_handlerObject



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/fluent/supervisor.rb', line 209

def install_windows_event_handler
  return unless Fluent.windows?

  @pid_signame = "fluentd_#{$$}"
  @signame = config[:signame]

  Thread.new do
    ipc = Win32::Ipc.new(nil)
    events = [
      Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"),
      Win32::Event.new("#{@pid_signame}"),
      Win32::Event.new("#{@pid_signame}_HUP"),
      Win32::Event.new("#{@pid_signame}_USR1"),
      Win32::Event.new("#{@pid_signame}_USR2"),
    ]
    if @signame
      signame_events = [
        Win32::Event.new("#{@signame}"),
        Win32::Event.new("#{@signame}_HUP"),
        Win32::Event.new("#{@signame}_USR1"),
        Win32::Event.new("#{@signame}_USR2"),
      ]
      events.concat(signame_events)
    end
    begin
      loop do
        idx = ipc.wait_any(events, Windows::Synchronize::INFINITE)
        if idx > 0 && idx <= events.length
          $log.debug("Got Win32 event \"#{events[idx - 1].name}\"")
        else
          $log.warn("Unexpected reutrn value of Win32::Ipc#wait_any: #{idx}")
        end
        case idx
        when 2, 6
          stop(true)
        when 3, 7
          supervisor_sighup_handler
        when 4, 8
          supervisor_sigusr1_handler
        when 5, 9
          supervisor_sigusr2_handler
        when 1
          break
        end
      end
    ensure
      events.each { |event| event.close }
    end
  end
end

#kill_workerObject



305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/fluent/supervisor.rb', line 305

def kill_worker
  if config[:worker_pid]
    pids = config[:worker_pid].clone
    config[:worker_pid].clear
    pids.each_value do |pid|
      if Fluent.windows?
        Process.kill :KILL, pid
      else
        Process.kill :TERM, pid
      end
    end
  end
end

#reloadObject



202
203
204
205
206
# File 'lib/fluent/supervisor.rb', line 202

def reload
  @monitors.each do |m|
    m.send_command("RELOAD\n")
  end
end

#restart(graceful) ⇒ Object

Override some methods of ServerEngine::MultiSpawnWorker Since Fluentd’s Supervisor doesn’t use ServerEngine’s HUP, USR1 and USR2 handlers (see install_supervisor_signal_handlers), they should be disabled also on Windows, just send commands to workers instead.



196
197
198
199
200
# File 'lib/fluent/supervisor.rb', line 196

def restart(graceful)
  @monitors.each do |m|
    m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n")
  end
end

#run_counter_server(counter_conf) ⇒ Object



160
161
162
163
164
165
166
# File 'lib/fluent/supervisor.rb', line 160

def run_counter_server(counter_conf)
  @counter = Fluent::Counter::Server.new(
    counter_conf.scope,
    {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path}
  )
  @counter.start
end

#run_rpc_serverObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/fluent/supervisor.rb', line 84

def run_rpc_server
  @rpc_server = RPC::Server.new(@rpc_endpoint, $log)

  # built-in RPC for signals
  @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
    Process.kill :INT, $$
    nil
  }
  @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.killWorkers request"
    Process.kill :TERM, $$
    nil
  }
  @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request"
    if Fluent.windows?
      supervisor_sigusr1_handler
      stop(true)
    else
      Process.kill :USR1, $$
      Process.kill :TERM, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
    $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
    if Fluent.windows?
      supervisor_sigusr1_handler
    else
      Process.kill :USR1, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.reload') { |req, res|
    $log.debug "fluentd RPC got /api/config.reload request"
    if Fluent.windows?
      # restart worker with auto restarting by killing
      kill_worker
    else
      Process.kill :HUP, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.dump') { |req, res|
    $log.debug "fluentd RPC got /api/config.dump request"
    $log.info "dump in-memory config"
    supervisor_dump_config_handler
    nil
  }

  @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
    $log.debug "fluentd RPC got /api/config.gracefulReload request"
    if Fluent.windows?
      supervisor_sigusr2_handler
    else
      Process.kill :USR2, $$
    end

    nil
  }

  @rpc_server.mount_proc('/api/config.getDump') { |req, res|
    $log.debug "fluentd RPC got /api/config.getDump request"
    $log.info "get dump in-memory config via HTTP"
    res.body = supervisor_get_dump_config_handler
    [nil, nil, res]
  } if @enable_get_dump

  @rpc_server.start
end

#stop_counter_serverObject



168
169
170
# File 'lib/fluent/supervisor.rb', line 168

def stop_counter_server
  @counter.stop
end

#stop_rpc_serverObject



156
157
158
# File 'lib/fluent/supervisor.rb', line 156

def stop_rpc_server
  @rpc_server.shutdown
end

#stop_windows_event_threadObject



260
261
262
263
264
265
266
# File 'lib/fluent/supervisor.rb', line 260

def stop_windows_event_thread
  if Fluent.windows?
    ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD")
    ev.set
    ev.close
  end
end

#supervisor_dump_config_handlerObject



319
320
321
# File 'lib/fluent/supervisor.rb', line 319

def supervisor_dump_config_handler
  $log.info @fluentd_conf
end

#supervisor_get_dump_config_handlerObject



323
324
325
# File 'lib/fluent/supervisor.rb', line 323

def supervisor_get_dump_config_handler
  { conf: @fluentd_conf }
end

#supervisor_sighup_handlerObject



268
269
270
# File 'lib/fluent/supervisor.rb', line 268

def supervisor_sighup_handler
  kill_worker
end

#supervisor_sigusr1_handlerObject



272
273
274
275
# File 'lib/fluent/supervisor.rb', line 272

def supervisor_sigusr1_handler
  reopen_log
  send_signal_to_workers(:USR1)
end

#supervisor_sigusr2_handlerObject



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/fluent/supervisor.rb', line 277

def supervisor_sigusr2_handler
  conf = nil
  t = Thread.new do
    $log.info 'Reloading new config'

    # Validate that loading config is valid at first
    conf = Fluent::Config.build(
      config_path: config[:config_path],
      encoding: config[:conf_encoding],
      additional_config: config[:inline_config],
      use_v1_config: config[:use_v1_config],
    )

    Fluent::VariableStore.try_to_reset do
      Fluent::Engine.reload_config(conf, supervisor: true)
    end
  end

  t.report_on_exception = false # Error is handled by myself
  t.join

  reopen_log
  send_signal_to_workers(:USR2)
  @fluentd_conf = conf.to_s
rescue => e
  $log.error "Failed to reload config file: #{e}"
end